aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorOndrej Fabry <ofabry@cisco.com>2020-06-25 11:55:58 +0200
committerOndrej Fabry <ofabry@cisco.com>2020-06-25 11:55:58 +0200
commitdf67791c6ffc96331f75aec7d3addfe2efca7739 (patch)
tree8da8dbda17146df62064737a86f5b46aec049a4d /core
parentceed73403bdb61387d04be8b47183e9c4a970749 (diff)
Introduce Stream - experimental API for low-level access to VPP API
Change-Id: I2698e11b76ff55d9730b47d4fee990be93349516 Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
Diffstat (limited to 'core')
-rw-r--r--core/channel.go51
-rw-r--r--core/control_ping.go24
-rw-r--r--core/request_handler.go157
-rw-r--r--core/stream.go124
4 files changed, 277 insertions, 79 deletions
diff --git a/core/channel.go b/core/channel.go
index 8479d6a..1b5e77e 100644
--- a/core/channel.go
+++ b/core/channel.go
@@ -102,19 +102,21 @@ type Channel struct {
lastSeqNum uint16 // sequence number of the last sent request
- delayedReply *vppReply // reply already taken from ReplyChan, buffered for later delivery
- replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
+ delayedReply *vppReply // reply already taken from ReplyChan, buffered for later delivery
+ replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
+ receiveReplyTimeout time.Duration // maximum time that we wait for receiver to consume reply
}
func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel {
return &Channel{
- id: id,
- conn: conn,
- msgCodec: codec,
- msgIdentifier: identifier,
- reqChan: make(chan *vppRequest, reqSize),
- replyChan: make(chan *vppReply, replySize),
- replyTimeout: DefaultReplyTimeout,
+ id: id,
+ conn: conn,
+ msgCodec: codec,
+ msgIdentifier: identifier,
+ reqChan: make(chan *vppRequest, reqSize),
+ replyChan: make(chan *vppReply, replySize),
+ replyTimeout: DefaultReplyTimeout,
+ receiveReplyTimeout: ReplyChannelTimeout,
}
}
@@ -122,28 +124,29 @@ func (ch *Channel) GetID() uint16 {
return ch.id
}
+func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx {
+ req := ch.newRequest(msg, false)
+ ch.reqChan <- req
+ return &requestCtx{ch: ch, seqNum: req.seqNum}
+}
+
+func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
+ req := ch.newRequest(msg, true)
+ ch.reqChan <- req
+ return &multiRequestCtx{ch: ch, seqNum: req.seqNum}
+}
+
func (ch *Channel) nextSeqNum() uint16 {
ch.lastSeqNum++
return ch.lastSeqNum
}
-func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx {
- seqNum := ch.nextSeqNum()
- ch.reqChan <- &vppRequest{
- msg: msg,
- seqNum: seqNum,
- }
- return &requestCtx{ch: ch, seqNum: seqNum}
-}
-
-func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
- seqNum := ch.nextSeqNum()
- ch.reqChan <- &vppRequest{
+func (ch *Channel) newRequest(msg api.Message, multi bool) *vppRequest {
+ return &vppRequest{
msg: msg,
- seqNum: seqNum,
- multi: true,
+ seqNum: ch.nextSeqNum(),
+ multi: multi,
}
- return &multiRequestCtx{ch: ch, seqNum: seqNum}
}
func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
diff --git a/core/control_ping.go b/core/control_ping.go
index b39fd3f..ed8d274 100644
--- a/core/control_ping.go
+++ b/core/control_ping.go
@@ -21,15 +21,9 @@ func SetControlPingReply(m api.Message) {
type ControlPing struct{}
-func (*ControlPing) GetMessageName() string {
- return "control_ping"
-}
-func (*ControlPing) GetCrcString() string {
- return "51077d14"
-}
-func (*ControlPing) GetMessageType() api.MessageType {
- return api.RequestMessage
-}
+func (*ControlPing) GetMessageName() string { return "control_ping" }
+func (*ControlPing) GetCrcString() string { return "51077d14" }
+func (*ControlPing) GetMessageType() api.MessageType { return api.RequestMessage }
type ControlPingReply struct {
Retval int32
@@ -37,15 +31,9 @@ type ControlPingReply struct {
VpePID uint32
}
-func (*ControlPingReply) GetMessageName() string {
- return "control_ping_reply"
-}
-func (*ControlPingReply) GetCrcString() string {
- return "f6b0b8ca"
-}
-func (*ControlPingReply) GetMessageType() api.MessageType {
- return api.ReplyMessage
-}
+func (*ControlPingReply) GetMessageName() string { return "control_ping_reply" }
+func (*ControlPingReply) GetCrcString() string { return "f6b0b8ca" }
+func (*ControlPingReply) GetMessageType() api.MessageType { return api.ReplyMessage }
func init() {
api.RegisterMessage((*ControlPing)(nil), "ControlPing")
diff --git a/core/request_handler.go b/core/request_handler.go
index e272c6f..fc704cb 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -45,18 +45,72 @@ func (c *Connection) watchRequests(ch *Channel) {
return
}
if err := c.processRequest(ch, req); err != nil {
- sendReplyError(ch, req, err)
+ sendReply(ch, &vppReply{
+ seqNum: req.seqNum,
+ err: fmt.Errorf("unable to process request: %w", err),
+ })
}
}
}
}
// processRequest processes a single request received on the request channel.
+func (c *Connection) sendMessage(context uint32, msg api.Message) error {
+ // check whether we are connected to VPP
+ if atomic.LoadUint32(&c.vppConnected) == 0 {
+ return ErrNotConnected
+ }
+
+ /*log := log.WithFields(logger.Fields{
+ "context": context,
+ "msg_name": msg.GetMessageName(),
+ "msg_crc": msg.GetCrcString(),
+ })*/
+
+ // retrieve message ID
+ msgID, err := c.GetMessageID(msg)
+ if err != nil {
+ //log.WithError(err).Debugf("unable to retrieve message ID: %#v", msg)
+ return err
+ }
+
+ //log = log.WithField("msg_id", msgID)
+
+ // encode the message
+ data, err := c.codec.EncodeMsg(msg, msgID)
+ if err != nil {
+ log.WithError(err).Debugf("unable to encode message: %#v", msg)
+ return err
+ }
+
+ //log = log.WithField("msg_length", len(data))
+
+ if log.Level >= logger.DebugLevel {
+ log.Debugf("--> SEND: MSG %T %+v", msg, msg)
+ }
+
+ // send message to VPP
+ err = c.vppClient.SendMsg(context, data)
+ if err != nil {
+ log.WithError(err).Debugf("unable to send message: %#v", msg)
+ return err
+ }
+
+ return nil
+}
+
+// processRequest processes a single request received on the request channel.
func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
// check whether we are connected to VPP
if atomic.LoadUint32(&c.vppConnected) == 0 {
err := ErrNotConnected
- log.Errorf("processing request failed: %v", err)
+ log.WithFields(logger.Fields{
+ "channel": ch.id,
+ "seq_num": req.seqNum,
+ "msg_name": req.msg.GetMessageName(),
+ "msg_crc": req.msg.GetCrcString(),
+ "error": err,
+ }).Warnf("Unable to process request")
return err
}
@@ -64,12 +118,13 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
msgID, err := c.GetMessageID(req.msg)
if err != nil {
log.WithFields(logger.Fields{
+ "channel": ch.id,
"msg_name": req.msg.GetMessageName(),
"msg_crc": req.msg.GetCrcString(),
"seq_num": req.seqNum,
"error": err,
- }).Errorf("failed to retrieve message ID")
- return fmt.Errorf("unable to retrieve message ID: %v", err)
+ }).Warnf("Unable to retrieve message ID")
+ return err
}
// encode the message into binary
@@ -79,35 +134,42 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"channel": ch.id,
"msg_id": msgID,
"msg_name": req.msg.GetMessageName(),
+ "msg_crc": req.msg.GetCrcString(),
"seq_num": req.seqNum,
"error": err,
- }).Errorf("failed to encode message: %#v", req.msg)
- return fmt.Errorf("unable to encode the message: %v", err)
+ }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg)
+ return err
}
context := packRequestContext(ch.id, req.multi, req.seqNum)
- if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
+ if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
"channel": ch.id,
- "context": context,
- "is_multi": req.multi,
"msg_id": msgID,
- "msg_size": len(data),
- "seq_num": req.seqNum,
+ "msg_name": req.msg.GetMessageName(),
"msg_crc": req.msg.GetCrcString(),
- }).Debugf("--> govpp SEND: %s %+v", req.msg.GetMessageName(), req.msg)
+ "seq_num": req.seqNum,
+ "is_multi": req.multi,
+ "context": context,
+ "data_len": len(data),
+ }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg)
}
// send the request to VPP
err = c.vppClient.SendMsg(context, data)
if err != nil {
- err = fmt.Errorf("unable to send the message: %v", err)
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- "seq_num": req.seqNum,
- }).Error(err)
+ "channel": ch.id,
+ "msg_id": msgID,
+ "msg_name": req.msg.GetMessageName(),
+ "msg_crc": req.msg.GetCrcString(),
+ "seq_num": req.seqNum,
+ "is_multi": req.multi,
+ "context": context,
+ "data_len": len(data),
+ "error": err,
+ }).Warnf("Unable to send message")
return err
}
@@ -115,20 +177,24 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
// send a control ping to determine end of the multipart response
pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
- log.WithFields(logger.Fields{
- "channel": ch.id,
- "context": context,
- "msg_id": c.pingReqID,
- "msg_size": len(pingData),
- "seq_num": req.seqNum,
- }).Debug(" -> sending control ping")
+ if log.Level >= logger.DebugLevel {
+ log.WithFields(logger.Fields{
+ "channel": ch.id,
+ "msg_id": c.pingReqID,
+ "msg_name": c.msgControlPing.GetMessageName(),
+ "msg_crc": c.msgControlPing.GetCrcString(),
+ "seq_num": req.seqNum,
+ "context": context,
+ "data_len": len(pingData),
+ }).Debugf(" -> SEND MSG: %T", c.msgControlPing)
+ }
if err := c.vppClient.SendMsg(context, pingData); err != nil {
log.WithFields(logger.Fields{
"context": context,
- "msg_id": msgID,
"seq_num": req.seqNum,
- }).Warnf("unable to send control ping: %v", err)
+ "error": err,
+ }).Warnf("unable to send control ping")
}
}
@@ -138,7 +204,9 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
// msgCallback is called whenever any binary API message comes from VPP.
func (c *Connection) msgCallback(msgID uint16, data []byte) {
if c == nil {
- log.Warn("Already disconnected, ignoring the message.")
+ log.WithField(
+ "msg_id", msgID,
+ ).Warn("Connection already disconnected, ignoring the message.")
return
}
@@ -155,7 +223,8 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
//
context, err := c.codec.DecodeMsgContext(data, msg)
if err != nil {
- log.Errorf("decoding context failed: %v", err)
+ log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err)
+ return
}
chanID, isMulti, seqNum := unpackRequestContext(context)
@@ -220,23 +289,36 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
// it logs the error and do not send the message.
func sendReply(ch *Channel, reply *vppReply) {
+ // first try to avoid creating timer
+ select {
+ case ch.replyChan <- reply:
+ return // reply sent ok
+ default:
+ // reply channel full
+ }
+ if ch.receiveReplyTimeout == 0 {
+ log.WithFields(logger.Fields{
+ "channel": ch.id,
+ "msg_id": reply.msgID,
+ "seq_num": reply.seqNum,
+ "err": reply.err,
+ }).Warn("Reply channel full, dropping reply.")
+ return
+ }
select {
case ch.replyChan <- reply:
- // reply sent successfully
- case <-time.After(ReplyChannelTimeout):
+ return // reply sent ok
+ case <-time.After(ch.receiveReplyTimeout):
// receiver still not ready
log.WithFields(logger.Fields{
- "channel": ch,
+ "channel": ch.id,
"msg_id": reply.msgID,
"seq_num": reply.seqNum,
- }).Warn("Unable to send the reply, reciever end not ready.")
+ "err": reply.err,
+ }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout)
}
}
-func sendReplyError(ch *Channel, req *vppRequest, err error) {
- sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
-}
-
// isNotificationMessage returns true if someone has subscribed to provided message ID.
func (c *Connection) isNotificationMessage(msgID uint16) bool {
c.subscriptionsLock.RLock()
@@ -267,7 +349,8 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) {
"msg_name": sub.event.GetMessageName(),
"msg_id": msgID,
"msg_size": len(data),
- }).Errorf("Unable to decode the notification message: %v", err)
+ "error": err,
+ }).Warnf("Unable to decode the notification message")
continue
}
diff --git a/core/stream.go b/core/stream.go
new file mode 100644
index 0000000..edc3f2b
--- /dev/null
+++ b/core/stream.go
@@ -0,0 +1,124 @@
+// Copyright (c) 2020 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package core
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "reflect"
+ "sync/atomic"
+
+ "git.fd.io/govpp.git/api"
+)
+
+type Stream struct {
+ id uint32
+ conn *Connection
+ ctx context.Context
+ channel *Channel
+}
+
+func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
+ // TODO: add stream options as variadic parameters for customizing:
+ // - request/reply channel size
+ // - reply timeout
+ // - retries
+ // - ???
+
+ // create new channel
+ chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+ channel := newChannel(chID, c, c.codec, c, 10, 10)
+
+ // store API channel within the client
+ c.channelsLock.Lock()
+ c.channels[chID] = channel
+ c.channelsLock.Unlock()
+
+ // Channel.watchRequests are not started here intentionally, because
+ // requests are sent directly by SendMsg.
+
+ return &Stream{
+ id: uint32(chID),
+ conn: c,
+ ctx: ctx,
+ channel: channel,
+ }, nil
+}
+
+func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
+ // TODO: implement invoke
+ panic("not implemented")
+}
+
+func (s *Stream) Context() context.Context {
+ return s.ctx
+}
+
+func (s *Stream) Close() error {
+ if s.conn == nil {
+ return errors.New("stream closed")
+ }
+ s.conn.releaseAPIChannel(s.channel)
+ s.conn = nil
+ return nil
+}
+
+func (s *Stream) SendMsg(msg api.Message) error {
+ if s.conn == nil {
+ return errors.New("stream closed")
+ }
+ req := s.channel.newRequest(msg, false)
+ if err := s.conn.processRequest(s.channel, req); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *Stream) RecvMsg() (api.Message, error) {
+ if s.conn == nil {
+ return nil, errors.New("stream closed")
+ }
+ select {
+ case reply, ok := <-s.channel.replyChan:
+ if !ok {
+ return nil, fmt.Errorf("reply channel closed")
+ }
+ if reply.err != nil {
+ // this case should actually never happen for stream
+ // since reply.err is only filled in watchRequests
+ // and stream does not use it
+ return nil, reply.err
+ }
+ // resolve message type
+ msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
+ if err != nil {
+ return nil, err
+ }
+ // allocate message instance
+ msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ // decode message data
+ if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
+ return nil, err
+ }
+ return msg, nil
+
+ case <-s.ctx.Done():
+ return nil, s.ctx.Err()
+ }
+}