aboutsummaryrefslogtreecommitdiffstats
path: root/core/request_handler.go
diff options
context:
space:
mode:
authorMilan Lenco <milan.lenco@pantheon.tech>2018-06-25 20:31:11 +0200
committerMilan Lenco <milan.lenco@pantheon.tech>2018-06-26 15:07:55 +0200
commit8adb6cdcb496f05169263d32a857791faf8baee1 (patch)
tree12bb4fbce0af84326c1a6d80b76a71ad097fdf7d /core/request_handler.go
parente44d8c3905e22f940a100e6331a45412cba9d47e (diff)
Pair requests with replies using sequence numbers
Requests are given sequence numbers (cycling over a finite set of 2^16 integers) that are stored into the lower 16bits of the context. 1bit is also allocated for isMultipart boolean flag and the remaining 15bits are used to store the channel ID. The sequence numbers allow to reliably pair replies with requests, even in scenarious with timeouted requests or ignored (unread) replies. Sequencing is not used with asynchronous messaging as it is implemented by methods of the Channel structure, i.e. above ReqChan and ReplyChan channels. Change-Id: I7ca0e8489c7ffcc388c3cfef6d05c02f9500931c Signed-off-by: Milan Lenco <milan.lenco@pantheon.tech>
Diffstat (limited to 'core/request_handler.go')
-rw-r--r--core/request_handler.go87
1 files changed, 56 insertions, 31 deletions
diff --git a/core/request_handler.go b/core/request_handler.go
index 8f793f5..3bec38d 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -31,17 +31,17 @@ var (
)
// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
+func (c *Connection) watchRequests(ch *api.Channel) {
for {
select {
case req, ok := <-ch.ReqChan:
// new request on the request channel
if !ok {
// after closing the request channel, release API channel and return
- c.releaseAPIChannel(ch, chMeta)
+ c.releaseAPIChannel(ch)
return
}
- c.processRequest(ch, chMeta, req)
+ c.processRequest(ch, req)
case req := <-ch.NotifSubsChan:
// new request on the notification subscribe channel
@@ -51,12 +51,12 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
}
// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
+func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error {
// check whether we are connected to VPP
if atomic.LoadUint32(&c.connected) == 0 {
err := ErrNotConnected
log.Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -67,8 +67,9 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
log.WithFields(logger.Fields{
"msg_name": req.Message.GetMessageName(),
"msg_crc": req.Message.GetCrcString(),
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -77,37 +78,35 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
if err != nil {
err = fmt.Errorf("unable to encode the messge: %v", err)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "channel": ch.ID,
"msg_id": msgID,
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "channel": ch.ID,
"msg_id": msgID,
"msg_size": len(data),
"msg_name": req.Message.GetMessageName(),
+ "seq_num": req.SeqNum,
}).Debug("Sending a message to VPP.")
}
- // send the message
- if req.Multipart {
- // expect multipart response
- atomic.StoreUint32(&chMeta.multipart, 1)
- }
-
// send the request to VPP
- err = c.vpp.SendMsg(chMeta.id, data)
+ context := packRequestContext(ch.ID, req.Multipart, req.SeqNum)
+ err = c.vpp.SendMsg(context, data)
if err != nil {
- err = fmt.Errorf("unable to send the messge: %v", err)
+ err = fmt.Errorf("unable to send the message: %v", err)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "context": context,
"msg_id": msgID,
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -116,12 +115,13 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "context": context,
"msg_id": c.pingReqID,
"msg_size": len(pingData),
+ "seq_num": req.SeqNum,
}).Debug("Sending a control ping to VPP.")
- c.vpp.SendMsg(chMeta.id, pingData)
+ c.vpp.SendMsg(context, pingData)
}
return nil
@@ -137,11 +137,14 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
return
}
+ chanID, isMultipart, seqNum := unpackRequestContext(context)
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- "msg_size": len(data),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ "channel_id": chanID,
+ "is_multipart": isMultipart,
+ "seq_num": seqNum,
}).Debug("Received a message from VPP.")
}
@@ -153,27 +156,27 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
// match ch according to the context
conn.channelsLock.RLock()
- ch, ok := conn.channels[context]
+ ch, ok := conn.channels[chanID]
conn.channelsLock.RUnlock()
if !ok {
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- }).Error("Context ID not known, ignoring the message.")
+ "channel_id": chanID,
+ "msg_id": msgID,
+ }).Error("Channel ID not known, ignoring the message.")
return
}
- chMeta := ch.Metadata().(*channelMetadata)
lastReplyReceived := false
- // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
- if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
+ // if this is a control ping reply to a multipart request, treat this as a last part of the reply
+ if msgID == conn.pingReplyID && isMultipart {
lastReplyReceived = true
}
// send the data to the channel
sendReply(ch, &api.VppReply{
MessageID: msgID,
+ SeqNum: seqNum,
Data: data,
LastReplyReceived: lastReplyReceived,
})
@@ -195,6 +198,7 @@ func sendReply(ch *api.Channel, reply *api.VppReply) {
log.WithFields(logger.Fields{
"channel": ch,
"msg_id": reply.MessageID,
+ "seq_num": reply.SeqNum,
}).Warn("Unable to send the reply, reciever end not ready.")
}
}
@@ -254,3 +258,24 @@ func (c *Connection) LookupByID(ID uint16) (string, error) {
return "", fmt.Errorf("unknown message ID: %d", ID)
}
+
+// +------------------+-------------------+-----------------------+
+// | 15b = channel ID | 1b = is multipart | 16b = sequence number |
+// +------------------+-------------------+-----------------------+
+func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
+ context := uint32(chanID) << 17
+ if isMultipart {
+ context |= 1 << 16
+ }
+ context |= uint32(seqNum)
+ return context
+}
+
+func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
+ chanID = uint16(context >> 17)
+ if ((context >> 16) & 0x1) != 0 {
+ isMulipart = true
+ }
+ seqNum = uint16(context & 0xffff)
+ return
+}