From c09ee3241377aae2530a73d48c4e20641d76d0ee Mon Sep 17 00:00:00 2001 From: mhalaj1 Date: Thu, 26 Aug 2021 20:15:08 +0200 Subject: Refactoring and fixes * refactor creation of new channel * add missing closing of created streams * correct documentation regarding thread safety of stream Signed-off-by: mhalaj1 Change-Id: Ic601efff298fcbdecaafab83fa236253af69de21 --- core/channel.go | 26 ++++++++++++++++++-------- core/connection.go | 9 +-------- core/stream.go | 12 ++---------- 3 files changed, 21 insertions(+), 26 deletions(-) (limited to 'core') diff --git a/core/channel.go b/core/channel.go index 4cb5761..1086c36 100644 --- a/core/channel.go +++ b/core/channel.go @@ -19,6 +19,7 @@ import ( "fmt" "reflect" "strings" + "sync/atomic" "time" "github.com/sirupsen/logrus" @@ -109,17 +110,26 @@ type Channel struct { receiveReplyTimeout time.Duration // maximum time that we wait for receiver to consume reply } -func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel { - return &Channel{ - id: id, - conn: conn, - msgCodec: codec, - msgIdentifier: identifier, - reqChan: make(chan *vppRequest, reqSize), - replyChan: make(chan *vppReply, replySize), +func (c *Connection) newChannel(reqChanBufSize, replyChanBufSize int) *Channel { + // create new channel + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + channel := &Channel{ + id: chID, + conn: c, + msgCodec: c.codec, + msgIdentifier: c, + reqChan: make(chan *vppRequest, reqChanBufSize), + replyChan: make(chan *vppReply, replyChanBufSize), replyTimeout: DefaultReplyTimeout, receiveReplyTimeout: ReplyChannelTimeout, } + + // store API channel within the client + c.channelsLock.Lock() + c.channels[chID] = channel + c.channelsLock.Unlock() + + return channel } func (ch *Channel) GetID() uint16 { diff --git a/core/connection.go b/core/connection.go index ee5a06b..935693e 100644 --- a/core/connection.go +++ b/core/connection.go @@ -245,14 +245,7 @@ func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Chann return nil, errors.New("nil connection passed in") } - // create new channel - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize) - - // store API channel within the client - c.channelsLock.Lock() - c.channels[chID] = channel - c.channelsLock.Unlock() + channel := c.newChannel(reqChanBufSize, replyChanBufSize) // start watching on the request channel go c.watchRequests(channel) diff --git a/core/stream.go b/core/stream.go index 3d417f1..363cc9f 100644 --- a/core/stream.go +++ b/core/stream.go @@ -20,14 +20,12 @@ import ( "fmt" "reflect" "sync" - "sync/atomic" "time" "git.fd.io/govpp.git/api" ) type Stream struct { - id uint32 conn *Connection ctx context.Context channel *Channel @@ -57,15 +55,9 @@ func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) for _, option := range options { option(s) } - // create and store a new channel - s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff - s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize) - s.channel.SetReplyTimeout(s.replyTimeout) - // store API channel within the client - c.channelsLock.Lock() - c.channels[uint16(s.id)] = s.channel - c.channelsLock.Unlock() + s.channel = c.newChannel(s.requestSize, s.replySize) + s.channel.SetReplyTimeout(s.replyTimeout) // Channel.watchRequests are not started here intentionally, because // requests are sent directly by SendMsg. -- cgit 1.2.3-korg