diff options
author | Vladimir Lavor <vlavor@cisco.com> | 2020-12-01 13:57:29 +0100 |
---|---|---|
committer | Vladimir Lavor <vlavor@cisco.com> | 2020-12-03 10:14:12 +0100 |
commit | bcf3fbd21aa22d1546bc85ffb887ae5ba557808e (patch) | |
tree | 60668ecb3d0721bf33cfa1b37736a494f675ec4b /core/stream.go | |
parent | 2b743eede78b6fed115421716888f80088edefdb (diff) |
Fixed incorrect message error in the stream API
The message package is passed to the stream object and
used to evaluate correct reply message type
Change-Id: I2c9844d6447d024af1693205efd5721e2f89f22d
Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
Diffstat (limited to 'core/stream.go')
-rw-r--r-- | core/stream.go | 12 |
1 files changed, 11 insertions, 1 deletions
diff --git a/core/stream.go b/core/stream.go index abe9d55..3d417f1 100644 --- a/core/stream.go +++ b/core/stream.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "reflect" + "sync" "sync/atomic" "time" @@ -34,6 +35,9 @@ type Stream struct { requestSize int replySize int replyTimeout time.Duration + // per-request context + pkgPath string + sync.Mutex } func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) { @@ -109,6 +113,9 @@ func (s *Stream) SendMsg(msg api.Message) error { if err := s.conn.processRequest(s.channel, req); err != nil { return err } + s.Lock() + s.pkgPath = s.conn.GetMessagePath(msg) + s.Unlock() return nil } @@ -118,7 +125,10 @@ func (s *Stream) RecvMsg() (api.Message, error) { return nil, err } // resolve message type - msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID) + s.Lock() + path := s.pkgPath + s.Unlock() + msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID) if err != nil { return nil, err } |