From df67791c6ffc96331f75aec7d3addfe2efca7739 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Thu, 25 Jun 2020 11:55:58 +0200 Subject: Introduce Stream - experimental API for low-level access to VPP API Change-Id: I2698e11b76ff55d9730b47d4fee990be93349516 Signed-off-by: Ondrej Fabry --- core/stream.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 core/stream.go (limited to 'core/stream.go') diff --git a/core/stream.go b/core/stream.go new file mode 100644 index 0000000..edc3f2b --- /dev/null +++ b/core/stream.go @@ -0,0 +1,124 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "context" + "errors" + "fmt" + "reflect" + "sync/atomic" + + "git.fd.io/govpp.git/api" +) + +type Stream struct { + id uint32 + conn *Connection + ctx context.Context + channel *Channel +} + +func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + // TODO: add stream options as variadic parameters for customizing: + // - request/reply channel size + // - reply timeout + // - retries + // - ??? + + // create new channel + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + channel := newChannel(chID, c, c.codec, c, 10, 10) + + // store API channel within the client + c.channelsLock.Lock() + c.channels[chID] = channel + c.channelsLock.Unlock() + + // Channel.watchRequests are not started here intentionally, because + // requests are sent directly by SendMsg. + + return &Stream{ + id: uint32(chID), + conn: c, + ctx: ctx, + channel: channel, + }, nil +} + +func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error { + // TODO: implement invoke + panic("not implemented") +} + +func (s *Stream) Context() context.Context { + return s.ctx +} + +func (s *Stream) Close() error { + if s.conn == nil { + return errors.New("stream closed") + } + s.conn.releaseAPIChannel(s.channel) + s.conn = nil + return nil +} + +func (s *Stream) SendMsg(msg api.Message) error { + if s.conn == nil { + return errors.New("stream closed") + } + req := s.channel.newRequest(msg, false) + if err := s.conn.processRequest(s.channel, req); err != nil { + return err + } + return nil +} + +func (s *Stream) RecvMsg() (api.Message, error) { + if s.conn == nil { + return nil, errors.New("stream closed") + } + select { + case reply, ok := <-s.channel.replyChan: + if !ok { + return nil, fmt.Errorf("reply channel closed") + } + if reply.err != nil { + // this case should actually never happen for stream + // since reply.err is only filled in watchRequests + // and stream does not use it + return nil, reply.err + } + // resolve message type + msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID) + if err != nil { + return nil, err + } + // allocate message instance + msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + // decode message data + if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil { + return nil, err + } + return msg, nil + + case <-s.ctx.Done(): + return nil, s.ctx.Err() + } +} -- cgit 1.2.3-korg