diff options
Diffstat (limited to 'core/request_handler.go')
-rw-r--r-- | core/request_handler.go | 87 |
1 files changed, 56 insertions, 31 deletions
diff --git a/core/request_handler.go b/core/request_handler.go index 8f793f5..3bec38d 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -31,17 +31,17 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { +func (c *Connection) watchRequests(ch *api.Channel) { for { select { case req, ok := <-ch.ReqChan: // new request on the request channel if !ok { // after closing the request channel, release API channel and return - c.releaseAPIChannel(ch, chMeta) + c.releaseAPIChannel(ch) return } - c.processRequest(ch, chMeta, req) + c.processRequest(ch, req) case req := <-ch.NotifSubsChan: // new request on the notification subscribe channel @@ -51,12 +51,12 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { +func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected log.Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -67,8 +67,9 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re log.WithFields(logger.Fields{ "msg_name": req.Message.GetMessageName(), "msg_crc": req.Message.GetCrcString(), + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -77,37 +78,35 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re if err != nil { err = fmt.Errorf("unable to encode the messge: %v", err) log.WithFields(logger.Fields{ - "context": chMeta.id, + "channel": ch.ID, "msg_id": msgID, + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "context": chMeta.id, + "channel": ch.ID, "msg_id": msgID, "msg_size": len(data), "msg_name": req.Message.GetMessageName(), + "seq_num": req.SeqNum, }).Debug("Sending a message to VPP.") } - // send the message - if req.Multipart { - // expect multipart response - atomic.StoreUint32(&chMeta.multipart, 1) - } - // send the request to VPP - err = c.vpp.SendMsg(chMeta.id, data) + context := packRequestContext(ch.ID, req.Multipart, req.SeqNum) + err = c.vpp.SendMsg(context, data) if err != nil { - err = fmt.Errorf("unable to send the messge: %v", err) + err = fmt.Errorf("unable to send the message: %v", err) log.WithFields(logger.Fields{ - "context": chMeta.id, + "context": context, "msg_id": msgID, + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -116,12 +115,13 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID) log.WithFields(logger.Fields{ - "context": chMeta.id, + "context": context, "msg_id": c.pingReqID, "msg_size": len(pingData), + "seq_num": req.SeqNum, }).Debug("Sending a control ping to VPP.") - c.vpp.SendMsg(chMeta.id, pingData) + c.vpp.SendMsg(context, pingData) } return nil @@ -137,11 +137,14 @@ func msgCallback(context uint32, msgID uint16, data []byte) { return } + chanID, isMultipart, seqNum := unpackRequestContext(context) if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "msg_size": len(data), + "msg_id": msgID, + "msg_size": len(data), + "channel_id": chanID, + "is_multipart": isMultipart, + "seq_num": seqNum, }).Debug("Received a message from VPP.") } @@ -153,27 +156,27 @@ func msgCallback(context uint32, msgID uint16, data []byte) { // match ch according to the context conn.channelsLock.RLock() - ch, ok := conn.channels[context] + ch, ok := conn.channels[chanID] conn.channelsLock.RUnlock() if !ok { log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - }).Error("Context ID not known, ignoring the message.") + "channel_id": chanID, + "msg_id": msgID, + }).Error("Channel ID not known, ignoring the message.") return } - chMeta := ch.Metadata().(*channelMetadata) lastReplyReceived := false - // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply - if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { + // if this is a control ping reply to a multipart request, treat this as a last part of the reply + if msgID == conn.pingReplyID && isMultipart { lastReplyReceived = true } // send the data to the channel sendReply(ch, &api.VppReply{ MessageID: msgID, + SeqNum: seqNum, Data: data, LastReplyReceived: lastReplyReceived, }) @@ -195,6 +198,7 @@ func sendReply(ch *api.Channel, reply *api.VppReply) { log.WithFields(logger.Fields{ "channel": ch, "msg_id": reply.MessageID, + "seq_num": reply.SeqNum, }).Warn("Unable to send the reply, reciever end not ready.") } } @@ -254,3 +258,24 @@ func (c *Connection) LookupByID(ID uint16) (string, error) { return "", fmt.Errorf("unknown message ID: %d", ID) } + +// +------------------+-------------------+-----------------------+ +// | 15b = channel ID | 1b = is multipart | 16b = sequence number | +// +------------------+-------------------+-----------------------+ +func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 { + context := uint32(chanID) << 17 + if isMultipart { + context |= 1 << 16 + } + context |= uint32(seqNum) + return context +} + +func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) { + chanID = uint16(context >> 17) + if ((context >> 16) & 0x1) != 0 { + isMulipart = true + } + seqNum = uint16(context & 0xffff) + return +} |