From a3bb834db727a3ac9a1ffcfeae9265e5dead851f Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Wed, 15 Aug 2018 12:59:25 +0200 Subject: Refactor GoVPP Squashed commit of the following: commit 348930db31575e9f59b3834d9fec07411f011e05 Author: Ondrej Fabry Date: Wed Aug 15 11:30:13 2018 +0200 Use debug level for log about different context commit 9fc963c559cea67a41b85c6cdadc322fb3b1fc7c Author: Ondrej Fabry Date: Wed Aug 15 11:22:03 2018 +0200 Remove annoying logs and add env vars for debugging commit fdc9fd624d13feadb602e0d03d58f8a44b7a565f Author: Ondrej Fabry Date: Wed Aug 15 11:18:47 2018 +0200 Fix printing unknown VPPApiError commit 8f968be36a91de4d4a8ea17593ba314f82aa9583 Author: Ondrej Fabry Date: Tue Aug 14 17:25:10 2018 +0200 Refactor entire GoVPP - fix some cases with inconsistent VPP messages, causing messages to be incorrectly identified as event or request - simplify API, remove direct access to internal Go channels - add module name with message to registration of messages - start watching filesystem only when vpe-api file does not exist - simplify code in message codec and remove unneeded parts - retrieve IDs of all registered messages after connect to VPP - define fallback for control ping in core to avoid duplicate registration - add SetLogLevel function to set logger level more easily - remove lot of unused code commit 34dd1b7e10ef0324aa8c4e4cc42375bd6021c6cb Author: Ondrej Fabry Date: Mon Aug 13 11:29:54 2018 +0200 Rename VnetError to VPPApiError commit c6549d6f77847a1367a12ff47fb716e2955e973a Author: Ondrej Fabry Date: Mon Aug 13 10:23:43 2018 +0200 Fix examples and regenerate binapi commit 4612e36b416779771f5efab4fc654c2766d2cb1c Author: Ondrej Fabry Date: Mon Aug 13 09:51:22 2018 +0200 Add parsing and generation for services commit ac9c5280c5aa27e325f327609e2364cc66f3723f Author: Ondrej Fabry Date: Fri Aug 10 14:30:15 2018 +0200 Fix exit status on error and add continue-onerror flag commit 9b3f4ef9fc7c8c62037fa107085eae18a8725314 Author: Ondrej Fabry Date: Thu Aug 9 15:20:56 2018 +0200 Return VnetError when Retval != 0 commit 8fd21a907b5e628ec4d2026215b83d15da96c297 Author: Ondrej Fabry Date: Thu Aug 9 14:59:05 2018 +0200 Add all missing errors from api_errno.h commit 08450f288d3046ebaecf40203174ae342a07f1eb Author: Ondrej Fabry Date: Thu Aug 9 14:29:27 2018 +0200 Update README commit d8dced0728dd62243539be741868fb7d9b8de4cc Author: Ondrej Fabry Date: Thu Aug 9 13:59:59 2018 +0200 Regenerate vpe in core commit 254da7592cdbf634cf7aa46ae36ce7bb6d4ee555 Author: Ondrej Fabry Date: Thu Aug 9 13:37:00 2018 +0200 Add VnetError type for Retvals commit 4475c1087fb53ab4c788e530bc7fef7cfc89d2cd Author: Ondrej Fabry Date: Thu Aug 9 13:36:07 2018 +0200 Add registration API commit 892a3ea5a2c703e2f7c29331663f6a6fa706bff5 Author: Ondrej Fabry Date: Thu Aug 9 13:30:43 2018 +0200 Generate registration for messages and check all IDs on connect commit 389ed03b6e7082260281866c3449d72d72347c99 Author: Ondrej Fabry Date: Thu Aug 9 11:32:41 2018 +0200 Show error for empty adapter (on Darwin/Windows) commit ef1ea040d656ade64242432dc5f06810ed8dcde6 Author: Ondrej Fabry Date: Thu Aug 9 11:31:37 2018 +0200 Improve logged info commit d4adae3b14ed54c8d693060dd857fa9ba5ec8e06 Author: Ondrej Fabry Date: Thu Aug 9 11:27:48 2018 +0200 Update examples commit 63921e1346014701a22639a2611129563bb1eb78 Author: Ondrej Fabry Date: Thu Aug 9 11:02:56 2018 +0200 Generate unions and fix some issues - add comments between sections - define structs on single line if it has no fields - generate unions with setters/getters for each field - fix detection of message type commit 6ab3e3fa590b245898306a6ffaf32c7721eab60c Author: Ondrej Fabry Date: Wed Aug 8 15:37:10 2018 +0200 Refactor binapi-generator - split JSON parsing from code generation - parse and generate enums - parse unions (no generation yet) - change output file suffix to '.ba.go' - cleanup and simplify code - split code into files - add flag for debug mode Change-Id: I58f685e0d4c7a38e9a7b6ea0a1f47792d95d7399 Signed-off-by: Ondrej Fabry --- core/request_handler.go | 226 ++++++++++++++++++++++++------------------------ 1 file changed, 112 insertions(+), 114 deletions(-) (limited to 'core/request_handler.go') diff --git a/core/request_handler.go b/core/request_handler.go index 8681963..fd6d100 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -21,8 +21,6 @@ import ( "time" logger "github.com/sirupsen/logrus" - - "git.fd.io/govpp.git/api" ) var ( @@ -45,151 +43,182 @@ func (c *Connection) watchRequests(ch *channel) { case req := <-ch.notifSubsChan: // new request on the notification subscribe channel - c.processNotifSubscribeRequest(ch, req) + c.processSubscriptionRequest(ch, req) } } } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error { +func (c *Connection) processRequest(ch *channel, req *vppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected - log.Error(err) - sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) + log.Errorf("processing request failed: %v", err) + sendReplyError(ch, req, err) return err } // retrieve message ID - msgID, err := c.GetMessageID(req.Message) + msgID, err := c.GetMessageID(req.msg) if err != nil { err = fmt.Errorf("unable to retrieve message ID: %v", err) log.WithFields(logger.Fields{ - "msg_name": req.Message.GetMessageName(), - "msg_crc": req.Message.GetCrcString(), - "seq_num": req.SeqNum, + "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), + "seq_num": req.seqNum, }).Error(err) - sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) + sendReplyError(ch, req, err) return err } // encode the message into binary - data, err := c.codec.EncodeMsg(req.Message, msgID) + data, err := c.codec.EncodeMsg(req.msg, msgID) if err != nil { err = fmt.Errorf("unable to encode the messge: %v", err) log.WithFields(logger.Fields{ - "channel": ch.id, - "msg_id": msgID, - "seq_num": req.SeqNum, + "channel": ch.id, + "msg_id": msgID, + "msg_name": req.msg.GetMessageName(), + "seq_num": req.seqNum, }).Error(err) - sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) + sendReplyError(ch, req, err) return err } + // get context + context := packRequestContext(ch.id, req.multi, req.seqNum) if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ "channel": ch.id, + "context": context, + "is_multi": req.multi, "msg_id": msgID, + "msg_name": req.msg.GetMessageName(), "msg_size": len(data), - "msg_name": req.Message.GetMessageName(), - "seq_num": req.SeqNum, - }).Debug("Sending a message to VPP.") + "seq_num": req.seqNum, + }).Debug(" -> Sending a message to VPP.") } // send the request to VPP - context := packRequestContext(ch.id, req.Multipart, req.SeqNum) err = c.vpp.SendMsg(context, data) if err != nil { err = fmt.Errorf("unable to send the message: %v", err) log.WithFields(logger.Fields{ "context": context, "msg_id": msgID, - "seq_num": req.SeqNum, + "seq_num": req.seqNum, }).Error(err) - sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) + sendReplyError(ch, req, err) return err } - if req.Multipart { + if req.multi { // send a control ping to determine end of the multipart response pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID) log.WithFields(logger.Fields{ + "channel": ch.id, "context": context, "msg_id": c.pingReqID, "msg_size": len(pingData), - "seq_num": req.SeqNum, - }).Debug("Sending a control ping to VPP.") - - c.vpp.SendMsg(context, pingData) + "seq_num": req.seqNum, + }).Debug(" -> Sending a control ping to VPP.") + + if err := c.vpp.SendMsg(context, pingData); err != nil { + log.WithFields(logger.Fields{ + "context": context, + "msg_id": msgID, + "seq_num": req.seqNum, + }).Warnf("unable to send control ping: %v", err) + } } return nil } // msgCallback is called whenever any binary API message comes from VPP. -func msgCallback(context uint32, msgID uint16, data []byte) { +func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { connLock.RLock() defer connLock.RUnlock() - if conn == nil { + if c == nil { log.Warn("Already disconnected, ignoring the message.") return } - chanID, isMultipart, seqNum := unpackRequestContext(context) + msg, ok := c.msgMap[msgID] + if !ok { + log.Warnf("Unknown message received, ID: %d", msgID) + return + } + + // decode message context to fix for special cases of messages, + // for example: + // - replies that don't have context as first field (comes as zero) + // - events that don't have context at all (comes as non zero) + // + msgContext, err := c.codec.DecodeMsgContext(data, msg) + if err == nil { + if context != msgContext { + log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext) + context = msgContext + } + } else { + log.Errorf("decoding context failed: %v", err) + } + + chanID, isMulti, seqNum := unpackRequestContext(context) if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "channel_id": chanID, - "is_multipart": isMultipart, - "seq_num": seqNum, - }).Debug("Received a message from VPP.") + "context": context, + "msg_id": msgID, + "msg_name": msg.GetMessageName(), + "msg_size": len(data), + "channel": chanID, + "is_multi": isMulti, + "seq_num": seqNum, + }).Debug(" <- Received a message from VPP.") } - if context == 0 || conn.isNotificationMessage(msgID) { + if context == 0 || c.isNotificationMessage(msgID) { // process the message as a notification - conn.sendNotifications(msgID, data) + c.sendNotifications(msgID, data) return } // match ch according to the context - conn.channelsLock.RLock() - ch, ok := conn.channels[chanID] - conn.channelsLock.RUnlock() - + c.channelsLock.RLock() + ch, ok := c.channels[chanID] + c.channelsLock.RUnlock() if !ok { log.WithFields(logger.Fields{ - "channel_id": chanID, - "msg_id": msgID, + "channel": chanID, + "msg_id": msgID, }).Error("Channel ID not known, ignoring the message.") return } - lastReplyReceived := false - // if this is a control ping reply to a multipart request, treat this as a last part of the reply - if msgID == conn.pingReplyID && isMultipart { - lastReplyReceived = true - } + // if this is a control ping reply to a multipart request, + // treat this as a last part of the reply + lastReplyReceived := isMulti && msgID == c.pingReplyID // send the data to the channel - sendReply(ch, &api.VppReply{ - MessageID: msgID, - SeqNum: seqNum, - Data: data, - LastReplyReceived: lastReplyReceived, + sendReply(ch, &vppReply{ + msgID: msgID, + seqNum: seqNum, + data: data, + lastReceived: lastReplyReceived, }) // store actual time of this reply - conn.lastReplyLock.Lock() - conn.lastReply = time.Now() - conn.lastReplyLock.Unlock() + c.lastReplyLock.Lock() + c.lastReply = time.Now() + c.lastReplyLock.Unlock() } // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. -func sendReply(ch *channel, reply *api.VppReply) { +func sendReply(ch *channel, reply *vppReply) { select { case ch.replyChan <- reply: // reply sent successfully @@ -197,66 +226,14 @@ func sendReply(ch *channel, reply *api.VppReply) { // receiver still not ready log.WithFields(logger.Fields{ "channel": ch, - "msg_id": reply.MessageID, - "seq_num": reply.SeqNum, + "msg_id": reply.msgID, + "seq_num": reply.seqNum, }).Warn("Unable to send the reply, reciever end not ready.") } } -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString()) -} - -// messageNameToID returns message ID of a message identified by its name and CRC. -func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) { - msgKey := msgName + "_" + msgCrc - - // try to get the ID from the map - c.msgIDsLock.RLock() - id, ok := c.msgIDs[msgKey] - c.msgIDsLock.RUnlock() - if ok { - return id, nil - } - - // get the ID using VPP API - id, err := c.vpp.GetMsgID(msgName, msgCrc) - if err != nil { - err = fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_crc": msgCrc, - }).Error(err) - return id, err - } - - c.msgIDsLock.Lock() - c.msgIDs[msgKey] = id - c.msgIDsLock.Unlock() - - return id, nil -} - -// LookupByID looks up message name and crc by ID. -func (c *Connection) LookupByID(ID uint16) (string, error) { - if c == nil { - return "", errors.New("nil connection passed in") - } - - c.msgIDsLock.Lock() - defer c.msgIDsLock.Unlock() - - for key, id := range c.msgIDs { - if id == ID { - return key, nil - } - } - - return "", fmt.Errorf("unknown message ID: %d", ID) +func sendReplyError(ch *channel, req *vppRequest, err error) { + sendReply(ch, &vppReply{seqNum: req.seqNum, err: err}) } // +------------------+-------------------+-----------------------+ @@ -279,3 +256,24 @@ func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNu seqNum = uint16(context & 0xffff) return } + +// compareSeqNumbers returns -1, 0, 1 if sequence number precedes, equals to, +// or succeeds seq. number . +// Since sequence numbers cycle in the finite set of size 2^16, the function +// must assume that the distance between compared sequence numbers is less than +// (2^16)/2 to determine the order. +func compareSeqNumbers(seqNum1, seqNum2 uint16) int { + // calculate distance from seqNum1 to seqNum2 + var dist uint16 + if seqNum1 <= seqNum2 { + dist = seqNum2 - seqNum1 + } else { + dist = 0xffff - (seqNum1 - seqNum2 - 1) + } + if dist == 0 { + return 0 + } else if dist <= 0x8000 { + return -1 + } + return 1 +} -- cgit 1.2.3-korg