aboutsummaryrefslogtreecommitdiffstats
path: root/core/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/stream.go')
-rw-r--r--core/stream.go124
1 files changed, 124 insertions, 0 deletions
diff --git a/core/stream.go b/core/stream.go
new file mode 100644
index 0000000..edc3f2b
--- /dev/null
+++ b/core/stream.go
@@ -0,0 +1,124 @@
+// Copyright (c) 2020 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package core
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "reflect"
+ "sync/atomic"
+
+ "git.fd.io/govpp.git/api"
+)
+
+type Stream struct {
+ id uint32
+ conn *Connection
+ ctx context.Context
+ channel *Channel
+}
+
+func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
+ // TODO: add stream options as variadic parameters for customizing:
+ // - request/reply channel size
+ // - reply timeout
+ // - retries
+ // - ???
+
+ // create new channel
+ chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+ channel := newChannel(chID, c, c.codec, c, 10, 10)
+
+ // store API channel within the client
+ c.channelsLock.Lock()
+ c.channels[chID] = channel
+ c.channelsLock.Unlock()
+
+ // Channel.watchRequests are not started here intentionally, because
+ // requests are sent directly by SendMsg.
+
+ return &Stream{
+ id: uint32(chID),
+ conn: c,
+ ctx: ctx,
+ channel: channel,
+ }, nil
+}
+
+func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
+ // TODO: implement invoke
+ panic("not implemented")
+}
+
+func (s *Stream) Context() context.Context {
+ return s.ctx
+}
+
+func (s *Stream) Close() error {
+ if s.conn == nil {
+ return errors.New("stream closed")
+ }
+ s.conn.releaseAPIChannel(s.channel)
+ s.conn = nil
+ return nil
+}
+
+func (s *Stream) SendMsg(msg api.Message) error {
+ if s.conn == nil {
+ return errors.New("stream closed")
+ }
+ req := s.channel.newRequest(msg, false)
+ if err := s.conn.processRequest(s.channel, req); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *Stream) RecvMsg() (api.Message, error) {
+ if s.conn == nil {
+ return nil, errors.New("stream closed")
+ }
+ select {
+ case reply, ok := <-s.channel.replyChan:
+ if !ok {
+ return nil, fmt.Errorf("reply channel closed")
+ }
+ if reply.err != nil {
+ // this case should actually never happen for stream
+ // since reply.err is only filled in watchRequests
+ // and stream does not use it
+ return nil, reply.err
+ }
+ // resolve message type
+ msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
+ if err != nil {
+ return nil, err
+ }
+ // allocate message instance
+ msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ // decode message data
+ if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
+ return nil, err
+ }
+ return msg, nil
+
+ case <-s.ctx.Done():
+ return nil, s.ctx.Err()
+ }
+}