aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authormhalaj1 <matus.halaj@pantheon.tech>2021-08-26 20:15:08 +0200
committermhalaj1 <matus.halaj@pantheon.tech>2021-09-07 15:24:53 +0200
commitc09ee3241377aae2530a73d48c4e20641d76d0ee (patch)
tree219fd1f7d9ba595f0f4c8dac9796bc76ce3556fc /core
parentdebc52dea8a81417bb08ca5bb934c7876b6d65e0 (diff)
Refactoring and fixes
* refactor creation of new channel * add missing closing of created streams * correct documentation regarding thread safety of stream Signed-off-by: mhalaj1 <matus.halaj@pantheon.tech> Change-Id: Ic601efff298fcbdecaafab83fa236253af69de21
Diffstat (limited to 'core')
-rw-r--r--core/channel.go26
-rw-r--r--core/connection.go9
-rw-r--r--core/stream.go12
3 files changed, 21 insertions, 26 deletions
diff --git a/core/channel.go b/core/channel.go
index 4cb5761..1086c36 100644
--- a/core/channel.go
+++ b/core/channel.go
@@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"strings"
+ "sync/atomic"
"time"
"github.com/sirupsen/logrus"
@@ -109,17 +110,26 @@ type Channel struct {
receiveReplyTimeout time.Duration // maximum time that we wait for receiver to consume reply
}
-func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel {
- return &Channel{
- id: id,
- conn: conn,
- msgCodec: codec,
- msgIdentifier: identifier,
- reqChan: make(chan *vppRequest, reqSize),
- replyChan: make(chan *vppReply, replySize),
+func (c *Connection) newChannel(reqChanBufSize, replyChanBufSize int) *Channel {
+ // create new channel
+ chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+ channel := &Channel{
+ id: chID,
+ conn: c,
+ msgCodec: c.codec,
+ msgIdentifier: c,
+ reqChan: make(chan *vppRequest, reqChanBufSize),
+ replyChan: make(chan *vppReply, replyChanBufSize),
replyTimeout: DefaultReplyTimeout,
receiveReplyTimeout: ReplyChannelTimeout,
}
+
+ // store API channel within the client
+ c.channelsLock.Lock()
+ c.channels[chID] = channel
+ c.channelsLock.Unlock()
+
+ return channel
}
func (ch *Channel) GetID() uint16 {
diff --git a/core/connection.go b/core/connection.go
index ee5a06b..935693e 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -245,14 +245,7 @@ func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Chann
return nil, errors.New("nil connection passed in")
}
- // create new channel
- chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
- channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
-
- // store API channel within the client
- c.channelsLock.Lock()
- c.channels[chID] = channel
- c.channelsLock.Unlock()
+ channel := c.newChannel(reqChanBufSize, replyChanBufSize)
// start watching on the request channel
go c.watchRequests(channel)
diff --git a/core/stream.go b/core/stream.go
index 3d417f1..363cc9f 100644
--- a/core/stream.go
+++ b/core/stream.go
@@ -20,14 +20,12 @@ import (
"fmt"
"reflect"
"sync"
- "sync/atomic"
"time"
"git.fd.io/govpp.git/api"
)
type Stream struct {
- id uint32
conn *Connection
ctx context.Context
channel *Channel
@@ -57,15 +55,9 @@ func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption)
for _, option := range options {
option(s)
}
- // create and store a new channel
- s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff
- s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize)
- s.channel.SetReplyTimeout(s.replyTimeout)
- // store API channel within the client
- c.channelsLock.Lock()
- c.channels[uint16(s.id)] = s.channel
- c.channelsLock.Unlock()
+ s.channel = c.newChannel(s.requestSize, s.replySize)
+ s.channel.SetReplyTimeout(s.replyTimeout)
// Channel.watchRequests are not started here intentionally, because
// requests are sent directly by SendMsg.