From 9ea1f778fb1458ce6b2265941885eab0b34b33d7 Mon Sep 17 00:00:00 2001 From: Vladimir Lavor Date: Mon, 12 Oct 2020 14:21:05 +0200 Subject: Stream API options * Stream API uses the same default values as the Channel API * request size, reply size and reply timeout settable using functional options * Added stream client example to show the stream API usage Change-Id: Id599134a7f520fc19f7d770ed5e3de74a7936829 Signed-off-by: Vladimir Lavor --- core/stream.go | 58 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 16 deletions(-) (limited to 'core') diff --git a/core/stream.go b/core/stream.go index 61a9965..abe9d55 100644 --- a/core/stream.go +++ b/core/stream.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "sync/atomic" + "time" "git.fd.io/govpp.git/api" ) @@ -29,36 +30,43 @@ type Stream struct { conn *Connection ctx context.Context channel *Channel + // available options + requestSize int + replySize int + replyTimeout time.Duration } -func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) { +func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) { if c == nil { return nil, errors.New("nil connection passed in") } - // TODO: add stream options as variadic parameters for customizing: - // - request/reply channel size - // - reply timeout - // - retries - // - ??? + s := &Stream{ + conn: c, + ctx: ctx, + // default options + requestSize: RequestChanBufSize, + replySize: ReplyChanBufSize, + replyTimeout: DefaultReplyTimeout, + } - // create new channel - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - channel := newChannel(chID, c, c.codec, c, 10, 10) + // parse custom options + for _, option := range options { + option(s) + } + // create and store a new channel + s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff + s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize) + s.channel.SetReplyTimeout(s.replyTimeout) // store API channel within the client c.channelsLock.Lock() - c.channels[chID] = channel + c.channels[uint16(s.id)] = s.channel c.channelsLock.Unlock() // Channel.watchRequests are not started here intentionally, because // requests are sent directly by SendMsg. - return &Stream{ - id: uint32(chID), - conn: c, - ctx: ctx, - channel: channel, - }, nil + return s, nil } func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error { @@ -123,6 +131,24 @@ func (s *Stream) RecvMsg() (api.Message, error) { return msg, nil } +func WithRequestSize(size int) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).requestSize = size + } +} + +func WithReplySize(size int) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).replySize = size + } +} + +func WithReplyTimeout(timeout time.Duration) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).replyTimeout = timeout + } +} + func (s *Stream) recvReply() (*vppReply, error) { if s.conn == nil { return nil, errors.New("stream closed") -- cgit 1.2.3-korg