aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorVladimir Lavor <vlavor@cisco.com>2020-10-12 14:21:05 +0200
committerOndrej Fabry <ofabry@cisco.com>2020-10-15 08:30:21 +0000
commit9ea1f778fb1458ce6b2265941885eab0b34b33d7 (patch)
treec1e8c18f9a626ced15c6ea6cf72c053931f6170a /core
parent8d4ee12e94e634b38f1dc55c830f8e222822215f (diff)
Stream API options
* Stream API uses the same default values as the Channel API * request size, reply size and reply timeout settable using functional options * Added stream client example to show the stream API usage Change-Id: Id599134a7f520fc19f7d770ed5e3de74a7936829 Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
Diffstat (limited to 'core')
-rw-r--r--core/stream.go58
1 files changed, 42 insertions, 16 deletions
diff --git a/core/stream.go b/core/stream.go
index 61a9965..abe9d55 100644
--- a/core/stream.go
+++ b/core/stream.go
@@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"sync/atomic"
+ "time"
"git.fd.io/govpp.git/api"
)
@@ -29,36 +30,43 @@ type Stream struct {
conn *Connection
ctx context.Context
channel *Channel
+ // available options
+ requestSize int
+ replySize int
+ replyTimeout time.Duration
}
-func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
+func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
if c == nil {
return nil, errors.New("nil connection passed in")
}
- // TODO: add stream options as variadic parameters for customizing:
- // - request/reply channel size
- // - reply timeout
- // - retries
- // - ???
+ s := &Stream{
+ conn: c,
+ ctx: ctx,
+ // default options
+ requestSize: RequestChanBufSize,
+ replySize: ReplyChanBufSize,
+ replyTimeout: DefaultReplyTimeout,
+ }
- // create new channel
- chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
- channel := newChannel(chID, c, c.codec, c, 10, 10)
+ // parse custom options
+ for _, option := range options {
+ option(s)
+ }
+ // create and store a new channel
+ s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff
+ s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize)
+ s.channel.SetReplyTimeout(s.replyTimeout)
// store API channel within the client
c.channelsLock.Lock()
- c.channels[chID] = channel
+ c.channels[uint16(s.id)] = s.channel
c.channelsLock.Unlock()
// Channel.watchRequests are not started here intentionally, because
// requests are sent directly by SendMsg.
- return &Stream{
- id: uint32(chID),
- conn: c,
- ctx: ctx,
- channel: channel,
- }, nil
+ return s, nil
}
func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
@@ -123,6 +131,24 @@ func (s *Stream) RecvMsg() (api.Message, error) {
return msg, nil
}
+func WithRequestSize(size int) api.StreamOption {
+ return func(stream api.Stream) {
+ stream.(*Stream).requestSize = size
+ }
+}
+
+func WithReplySize(size int) api.StreamOption {
+ return func(stream api.Stream) {
+ stream.(*Stream).replySize = size
+ }
+}
+
+func WithReplyTimeout(timeout time.Duration) api.StreamOption {
+ return func(stream api.Stream) {
+ stream.(*Stream).replyTimeout = timeout
+ }
+}
+
func (s *Stream) recvReply() (*vppReply, error) {
if s.conn == nil {
return nil, errors.New("stream closed")