aboutsummaryrefslogtreecommitdiffstats
path: root/core/stream.go
diff options
context:
space:
mode:
authorVladimir Lavor <vlavor@cisco.com>2020-12-01 13:57:29 +0100
committerVladimir Lavor <vlavor@cisco.com>2020-12-03 10:14:12 +0100
commitbcf3fbd21aa22d1546bc85ffb887ae5ba557808e (patch)
tree60668ecb3d0721bf33cfa1b37736a494f675ec4b /core/stream.go
parent2b743eede78b6fed115421716888f80088edefdb (diff)
Fixed incorrect message error in the stream API
The message package is passed to the stream object and used to evaluate correct reply message type Change-Id: I2c9844d6447d024af1693205efd5721e2f89f22d Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
Diffstat (limited to 'core/stream.go')
-rw-r--r--core/stream.go12
1 files changed, 11 insertions, 1 deletions
diff --git a/core/stream.go b/core/stream.go
index abe9d55..3d417f1 100644
--- a/core/stream.go
+++ b/core/stream.go
@@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"reflect"
+ "sync"
"sync/atomic"
"time"
@@ -34,6 +35,9 @@ type Stream struct {
requestSize int
replySize int
replyTimeout time.Duration
+ // per-request context
+ pkgPath string
+ sync.Mutex
}
func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
@@ -109,6 +113,9 @@ func (s *Stream) SendMsg(msg api.Message) error {
if err := s.conn.processRequest(s.channel, req); err != nil {
return err
}
+ s.Lock()
+ s.pkgPath = s.conn.GetMessagePath(msg)
+ s.Unlock()
return nil
}
@@ -118,7 +125,10 @@ func (s *Stream) RecvMsg() (api.Message, error) {
return nil, err
}
// resolve message type
- msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
+ s.Lock()
+ path := s.pkgPath
+ s.Unlock()
+ msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID)
if err != nil {
return nil, err
}