From df67791c6ffc96331f75aec7d3addfe2efca7739 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Thu, 25 Jun 2020 11:55:58 +0200 Subject: Introduce Stream - experimental API for low-level access to VPP API Change-Id: I2698e11b76ff55d9730b47d4fee990be93349516 Signed-off-by: Ondrej Fabry --- core/channel.go | 51 ++++++++-------- core/control_ping.go | 24 ++------ core/request_handler.go | 157 ++++++++++++++++++++++++++++++++++++------------ core/stream.go | 124 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 277 insertions(+), 79 deletions(-) create mode 100644 core/stream.go (limited to 'core') diff --git a/core/channel.go b/core/channel.go index 8479d6a..1b5e77e 100644 --- a/core/channel.go +++ b/core/channel.go @@ -102,19 +102,21 @@ type Channel struct { lastSeqNum uint16 // sequence number of the last sent request - delayedReply *vppReply // reply already taken from ReplyChan, buffered for later delivery - replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout + delayedReply *vppReply // reply already taken from ReplyChan, buffered for later delivery + replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout + receiveReplyTimeout time.Duration // maximum time that we wait for receiver to consume reply } func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel { return &Channel{ - id: id, - conn: conn, - msgCodec: codec, - msgIdentifier: identifier, - reqChan: make(chan *vppRequest, reqSize), - replyChan: make(chan *vppReply, replySize), - replyTimeout: DefaultReplyTimeout, + id: id, + conn: conn, + msgCodec: codec, + msgIdentifier: identifier, + reqChan: make(chan *vppRequest, reqSize), + replyChan: make(chan *vppReply, replySize), + replyTimeout: DefaultReplyTimeout, + receiveReplyTimeout: ReplyChannelTimeout, } } @@ -122,28 +124,29 @@ func (ch *Channel) GetID() uint16 { return ch.id } +func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx { + req := ch.newRequest(msg, false) + ch.reqChan <- req + return &requestCtx{ch: ch, seqNum: req.seqNum} +} + +func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { + req := ch.newRequest(msg, true) + ch.reqChan <- req + return &multiRequestCtx{ch: ch, seqNum: req.seqNum} +} + func (ch *Channel) nextSeqNum() uint16 { ch.lastSeqNum++ return ch.lastSeqNum } -func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx { - seqNum := ch.nextSeqNum() - ch.reqChan <- &vppRequest{ - msg: msg, - seqNum: seqNum, - } - return &requestCtx{ch: ch, seqNum: seqNum} -} - -func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { - seqNum := ch.nextSeqNum() - ch.reqChan <- &vppRequest{ +func (ch *Channel) newRequest(msg api.Message, multi bool) *vppRequest { + return &vppRequest{ msg: msg, - seqNum: seqNum, - multi: true, + seqNum: ch.nextSeqNum(), + multi: multi, } - return &multiRequestCtx{ch: ch, seqNum: seqNum} } func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error { diff --git a/core/control_ping.go b/core/control_ping.go index b39fd3f..ed8d274 100644 --- a/core/control_ping.go +++ b/core/control_ping.go @@ -21,15 +21,9 @@ func SetControlPingReply(m api.Message) { type ControlPing struct{} -func (*ControlPing) GetMessageName() string { - return "control_ping" -} -func (*ControlPing) GetCrcString() string { - return "51077d14" -} -func (*ControlPing) GetMessageType() api.MessageType { - return api.RequestMessage -} +func (*ControlPing) GetMessageName() string { return "control_ping" } +func (*ControlPing) GetCrcString() string { return "51077d14" } +func (*ControlPing) GetMessageType() api.MessageType { return api.RequestMessage } type ControlPingReply struct { Retval int32 @@ -37,15 +31,9 @@ type ControlPingReply struct { VpePID uint32 } -func (*ControlPingReply) GetMessageName() string { - return "control_ping_reply" -} -func (*ControlPingReply) GetCrcString() string { - return "f6b0b8ca" -} -func (*ControlPingReply) GetMessageType() api.MessageType { - return api.ReplyMessage -} +func (*ControlPingReply) GetMessageName() string { return "control_ping_reply" } +func (*ControlPingReply) GetCrcString() string { return "f6b0b8ca" } +func (*ControlPingReply) GetMessageType() api.MessageType { return api.ReplyMessage } func init() { api.RegisterMessage((*ControlPing)(nil), "ControlPing") diff --git a/core/request_handler.go b/core/request_handler.go index e272c6f..fc704cb 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -45,18 +45,72 @@ func (c *Connection) watchRequests(ch *Channel) { return } if err := c.processRequest(ch, req); err != nil { - sendReplyError(ch, req, err) + sendReply(ch, &vppReply{ + seqNum: req.seqNum, + err: fmt.Errorf("unable to process request: %w", err), + }) } } } } +// processRequest processes a single request received on the request channel. +func (c *Connection) sendMessage(context uint32, msg api.Message) error { + // check whether we are connected to VPP + if atomic.LoadUint32(&c.vppConnected) == 0 { + return ErrNotConnected + } + + /*log := log.WithFields(logger.Fields{ + "context": context, + "msg_name": msg.GetMessageName(), + "msg_crc": msg.GetCrcString(), + })*/ + + // retrieve message ID + msgID, err := c.GetMessageID(msg) + if err != nil { + //log.WithError(err).Debugf("unable to retrieve message ID: %#v", msg) + return err + } + + //log = log.WithField("msg_id", msgID) + + // encode the message + data, err := c.codec.EncodeMsg(msg, msgID) + if err != nil { + log.WithError(err).Debugf("unable to encode message: %#v", msg) + return err + } + + //log = log.WithField("msg_length", len(data)) + + if log.Level >= logger.DebugLevel { + log.Debugf("--> SEND: MSG %T %+v", msg, msg) + } + + // send message to VPP + err = c.vppClient.SendMsg(context, data) + if err != nil { + log.WithError(err).Debugf("unable to send message: %#v", msg) + return err + } + + return nil +} + // processRequest processes a single request received on the request channel. func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.vppConnected) == 0 { err := ErrNotConnected - log.Errorf("processing request failed: %v", err) + log.WithFields(logger.Fields{ + "channel": ch.id, + "seq_num": req.seqNum, + "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), + "error": err, + }).Warnf("Unable to process request") return err } @@ -64,12 +118,13 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { msgID, err := c.GetMessageID(req.msg) if err != nil { log.WithFields(logger.Fields{ + "channel": ch.id, "msg_name": req.msg.GetMessageName(), "msg_crc": req.msg.GetCrcString(), "seq_num": req.seqNum, "error": err, - }).Errorf("failed to retrieve message ID") - return fmt.Errorf("unable to retrieve message ID: %v", err) + }).Warnf("Unable to retrieve message ID") + return err } // encode the message into binary @@ -79,35 +134,42 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { "channel": ch.id, "msg_id": msgID, "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), "seq_num": req.seqNum, "error": err, - }).Errorf("failed to encode message: %#v", req.msg) - return fmt.Errorf("unable to encode the message: %v", err) + }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg) + return err } context := packRequestContext(ch.id, req.multi, req.seqNum) - if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled + if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ "channel": ch.id, - "context": context, - "is_multi": req.multi, "msg_id": msgID, - "msg_size": len(data), - "seq_num": req.seqNum, + "msg_name": req.msg.GetMessageName(), "msg_crc": req.msg.GetCrcString(), - }).Debugf("--> govpp SEND: %s %+v", req.msg.GetMessageName(), req.msg) + "seq_num": req.seqNum, + "is_multi": req.multi, + "context": context, + "data_len": len(data), + }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg) } // send the request to VPP err = c.vppClient.SendMsg(context, data) if err != nil { - err = fmt.Errorf("unable to send the message: %v", err) log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "seq_num": req.seqNum, - }).Error(err) + "channel": ch.id, + "msg_id": msgID, + "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), + "seq_num": req.seqNum, + "is_multi": req.multi, + "context": context, + "data_len": len(data), + "error": err, + }).Warnf("Unable to send message") return err } @@ -115,20 +177,24 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // send a control ping to determine end of the multipart response pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID) - log.WithFields(logger.Fields{ - "channel": ch.id, - "context": context, - "msg_id": c.pingReqID, - "msg_size": len(pingData), - "seq_num": req.seqNum, - }).Debug(" -> sending control ping") + if log.Level >= logger.DebugLevel { + log.WithFields(logger.Fields{ + "channel": ch.id, + "msg_id": c.pingReqID, + "msg_name": c.msgControlPing.GetMessageName(), + "msg_crc": c.msgControlPing.GetCrcString(), + "seq_num": req.seqNum, + "context": context, + "data_len": len(pingData), + }).Debugf(" -> SEND MSG: %T", c.msgControlPing) + } if err := c.vppClient.SendMsg(context, pingData); err != nil { log.WithFields(logger.Fields{ "context": context, - "msg_id": msgID, "seq_num": req.seqNum, - }).Warnf("unable to send control ping: %v", err) + "error": err, + }).Warnf("unable to send control ping") } } @@ -138,7 +204,9 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // msgCallback is called whenever any binary API message comes from VPP. func (c *Connection) msgCallback(msgID uint16, data []byte) { if c == nil { - log.Warn("Already disconnected, ignoring the message.") + log.WithField( + "msg_id", msgID, + ).Warn("Connection already disconnected, ignoring the message.") return } @@ -155,7 +223,8 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { // context, err := c.codec.DecodeMsgContext(data, msg) if err != nil { - log.Errorf("decoding context failed: %v", err) + log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err) + return } chanID, isMulti, seqNum := unpackRequestContext(context) @@ -220,23 +289,36 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. func sendReply(ch *Channel, reply *vppReply) { + // first try to avoid creating timer + select { + case ch.replyChan <- reply: + return // reply sent ok + default: + // reply channel full + } + if ch.receiveReplyTimeout == 0 { + log.WithFields(logger.Fields{ + "channel": ch.id, + "msg_id": reply.msgID, + "seq_num": reply.seqNum, + "err": reply.err, + }).Warn("Reply channel full, dropping reply.") + return + } select { case ch.replyChan <- reply: - // reply sent successfully - case <-time.After(ReplyChannelTimeout): + return // reply sent ok + case <-time.After(ch.receiveReplyTimeout): // receiver still not ready log.WithFields(logger.Fields{ - "channel": ch, + "channel": ch.id, "msg_id": reply.msgID, "seq_num": reply.seqNum, - }).Warn("Unable to send the reply, reciever end not ready.") + "err": reply.err, + }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout) } } -func sendReplyError(ch *Channel, req *vppRequest, err error) { - sendReply(ch, &vppReply{seqNum: req.seqNum, err: err}) -} - // isNotificationMessage returns true if someone has subscribed to provided message ID. func (c *Connection) isNotificationMessage(msgID uint16) bool { c.subscriptionsLock.RLock() @@ -267,7 +349,8 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) { "msg_name": sub.event.GetMessageName(), "msg_id": msgID, "msg_size": len(data), - }).Errorf("Unable to decode the notification message: %v", err) + "error": err, + }).Warnf("Unable to decode the notification message") continue } diff --git a/core/stream.go b/core/stream.go new file mode 100644 index 0000000..edc3f2b --- /dev/null +++ b/core/stream.go @@ -0,0 +1,124 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "context" + "errors" + "fmt" + "reflect" + "sync/atomic" + + "git.fd.io/govpp.git/api" +) + +type Stream struct { + id uint32 + conn *Connection + ctx context.Context + channel *Channel +} + +func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + // TODO: add stream options as variadic parameters for customizing: + // - request/reply channel size + // - reply timeout + // - retries + // - ??? + + // create new channel + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + channel := newChannel(chID, c, c.codec, c, 10, 10) + + // store API channel within the client + c.channelsLock.Lock() + c.channels[chID] = channel + c.channelsLock.Unlock() + + // Channel.watchRequests are not started here intentionally, because + // requests are sent directly by SendMsg. + + return &Stream{ + id: uint32(chID), + conn: c, + ctx: ctx, + channel: channel, + }, nil +} + +func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error { + // TODO: implement invoke + panic("not implemented") +} + +func (s *Stream) Context() context.Context { + return s.ctx +} + +func (s *Stream) Close() error { + if s.conn == nil { + return errors.New("stream closed") + } + s.conn.releaseAPIChannel(s.channel) + s.conn = nil + return nil +} + +func (s *Stream) SendMsg(msg api.Message) error { + if s.conn == nil { + return errors.New("stream closed") + } + req := s.channel.newRequest(msg, false) + if err := s.conn.processRequest(s.channel, req); err != nil { + return err + } + return nil +} + +func (s *Stream) RecvMsg() (api.Message, error) { + if s.conn == nil { + return nil, errors.New("stream closed") + } + select { + case reply, ok := <-s.channel.replyChan: + if !ok { + return nil, fmt.Errorf("reply channel closed") + } + if reply.err != nil { + // this case should actually never happen for stream + // since reply.err is only filled in watchRequests + // and stream does not use it + return nil, reply.err + } + // resolve message type + msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID) + if err != nil { + return nil, err + } + // allocate message instance + msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + // decode message data + if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil { + return nil, err + } + return msg, nil + + case <-s.ctx.Done(): + return nil, s.ctx.Err() + } +} -- cgit 1.2.3-korg