aboutsummaryrefslogtreecommitdiffstats
path: root/core/request_handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/request_handler.go')
-rw-r--r--core/request_handler.go87
1 files changed, 56 insertions, 31 deletions
diff --git a/core/request_handler.go b/core/request_handler.go
index 8f793f5..3bec38d 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -31,17 +31,17 @@ var (
)
// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
+func (c *Connection) watchRequests(ch *api.Channel) {
for {
select {
case req, ok := <-ch.ReqChan:
// new request on the request channel
if !ok {
// after closing the request channel, release API channel and return
- c.releaseAPIChannel(ch, chMeta)
+ c.releaseAPIChannel(ch)
return
}
- c.processRequest(ch, chMeta, req)
+ c.processRequest(ch, req)
case req := <-ch.NotifSubsChan:
// new request on the notification subscribe channel
@@ -51,12 +51,12 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
}
// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
+func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error {
// check whether we are connected to VPP
if atomic.LoadUint32(&c.connected) == 0 {
err := ErrNotConnected
log.Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -67,8 +67,9 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
log.WithFields(logger.Fields{
"msg_name": req.Message.GetMessageName(),
"msg_crc": req.Message.GetCrcString(),
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -77,37 +78,35 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
if err != nil {
err = fmt.Errorf("unable to encode the messge: %v", err)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "channel": ch.ID,
"msg_id": msgID,
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "channel": ch.ID,
"msg_id": msgID,
"msg_size": len(data),
"msg_name": req.Message.GetMessageName(),
+ "seq_num": req.SeqNum,
}).Debug("Sending a message to VPP.")
}
- // send the message
- if req.Multipart {
- // expect multipart response
- atomic.StoreUint32(&chMeta.multipart, 1)
- }
-
// send the request to VPP
- err = c.vpp.SendMsg(chMeta.id, data)
+ context := packRequestContext(ch.ID, req.Multipart, req.SeqNum)
+ err = c.vpp.SendMsg(context, data)
if err != nil {
- err = fmt.Errorf("unable to send the messge: %v", err)
+ err = fmt.Errorf("unable to send the message: %v", err)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "context": context,
"msg_id": msgID,
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -116,12 +115,13 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "context": context,
"msg_id": c.pingReqID,
"msg_size": len(pingData),
+ "seq_num": req.SeqNum,
}).Debug("Sending a control ping to VPP.")
- c.vpp.SendMsg(chMeta.id, pingData)
+ c.vpp.SendMsg(context, pingData)
}
return nil
@@ -137,11 +137,14 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
return
}
+ chanID, isMultipart, seqNum := unpackRequestContext(context)
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- "msg_size": len(data),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ "channel_id": chanID,
+ "is_multipart": isMultipart,
+ "seq_num": seqNum,
}).Debug("Received a message from VPP.")
}
@@ -153,27 +156,27 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
// match ch according to the context
conn.channelsLock.RLock()
- ch, ok := conn.channels[context]
+ ch, ok := conn.channels[chanID]
conn.channelsLock.RUnlock()
if !ok {
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- }).Error("Context ID not known, ignoring the message.")
+ "channel_id": chanID,
+ "msg_id": msgID,
+ }).Error("Channel ID not known, ignoring the message.")
return
}
- chMeta := ch.Metadata().(*channelMetadata)
lastReplyReceived := false
- // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
- if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
+ // if this is a control ping reply to a multipart request, treat this as a last part of the reply
+ if msgID == conn.pingReplyID && isMultipart {
lastReplyReceived = true
}
// send the data to the channel
sendReply(ch, &api.VppReply{
MessageID: msgID,
+ SeqNum: seqNum,
Data: data,
LastReplyReceived: lastReplyReceived,
})
@@ -195,6 +198,7 @@ func sendReply(ch *api.Channel, reply *api.VppReply) {
log.WithFields(logger.Fields{
"channel": ch,
"msg_id": reply.MessageID,
+ "seq_num": reply.SeqNum,
}).Warn("Unable to send the reply, reciever end not ready.")
}
}
@@ -254,3 +258,24 @@ func (c *Connection) LookupByID(ID uint16) (string, error) {
return "", fmt.Errorf("unknown message ID: %d", ID)
}
+
+// +------------------+-------------------+-----------------------+
+// | 15b = channel ID | 1b = is multipart | 16b = sequence number |
+// +------------------+-------------------+-----------------------+
+func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
+ context := uint32(chanID) << 17
+ if isMultipart {
+ context |= 1 << 16
+ }
+ context |= uint32(seqNum)
+ return context
+}
+
+func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
+ chanID = uint16(context >> 17)
+ if ((context >> 16) & 0x1) != 0 {
+ isMulipart = true
+ }
+ seqNum = uint16(context & 0xffff)
+ return
+}