summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMilan Lenco <milan.lenco@pantheon.tech>2018-06-25 20:31:11 +0200
committerMilan Lenco <milan.lenco@pantheon.tech>2018-06-26 15:07:55 +0200
commit8adb6cdcb496f05169263d32a857791faf8baee1 (patch)
tree12bb4fbce0af84326c1a6d80b76a71ad097fdf7d
parente44d8c3905e22f940a100e6331a45412cba9d47e (diff)
Pair requests with replies using sequence numbers
Requests are given sequence numbers (cycling over a finite set of 2^16 integers) that are stored into the lower 16bits of the context. 1bit is also allocated for isMultipart boolean flag and the remaining 15bits are used to store the channel ID. The sequence numbers allow to reliably pair replies with requests, even in scenarious with timeouted requests or ignored (unread) replies. Sequencing is not used with asynchronous messaging as it is implemented by methods of the Channel structure, i.e. above ReqChan and ReplyChan channels. Change-Id: I7ca0e8489c7ffcc388c3cfef6d05c02f9500931c Signed-off-by: Milan Lenco <milan.lenco@pantheon.tech>
-rw-r--r--adapter/adapter.go2
-rw-r--r--adapter/mock/mock_adapter.go73
-rw-r--r--adapter/vppapiclient/vppapiclient_adapter.go4
-rw-r--r--api/api.go171
-rw-r--r--api/api_test.go64
-rw-r--r--core/core.go25
-rw-r--r--core/core_test.go252
-rw-r--r--core/request_handler.go87
8 files changed, 508 insertions, 170 deletions
diff --git a/adapter/adapter.go b/adapter/adapter.go
index a5b3352..bc3a573 100644
--- a/adapter/adapter.go
+++ b/adapter/adapter.go
@@ -26,7 +26,7 @@ type VppAdapter interface {
GetMsgID(msgName string, msgCrc string) (uint16, error)
// SendMsg sends a binary-encoded message to VPP.
- SendMsg(clientID uint32, data []byte) error
+ SendMsg(context uint32, data []byte) error
// SetMsgCallback sets a callback function that will be called by the adapter whenever a message comes from VPP.
SetMsgCallback(func(context uint32, msgId uint16, data []byte))
diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go
index dab51a6..ae4caf0 100644
--- a/adapter/mock/mock_adapter.go
+++ b/adapter/mock/mock_adapter.go
@@ -48,10 +48,10 @@ type VppAdapter struct {
binAPITypes map[string]reflect.Type
access sync.RWMutex
- replies []api.Message // FIFO queue of messages
- replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses
- repliesLock sync.Mutex // mutex for the queue
- mode replyMode // mode in which the mock operates
+ replies []reply // FIFO queue of messages
+ replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses
+ repliesLock sync.Mutex // mutex for the queue
+ mode replyMode // mode in which the mock operates
}
// defaultReply is a default reply message that mock adapter returns for a request.
@@ -67,6 +67,13 @@ type MessageDTO struct {
Data []byte
}
+type reply struct {
+ msg api.Message
+ multipart bool
+ hasSeqNum bool
+ seqNum uint16
+}
+
// ReplyHandler is a type that allows to extend the behaviour of VPP mock.
// Return value ok is used to signalize that mock reply is calculated and ready to be used.
type ReplyHandler func(request MessageDTO) (reply []byte, msgID uint16, ok bool)
@@ -247,28 +254,35 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
defer a.repliesLock.Unlock()
// pop all replies from queue
+ withSeqNums := false
for i, reply := range a.replies {
- if i > 0 && reply.GetMessageName() == "control_ping_reply" {
+ if i > 0 && reply.msg.GetMessageName() == "control_ping_reply" && !withSeqNums {
// hack - do not send control_ping_reply immediately, leave it for the the next callback
a.replies = a.replies[i:]
return nil
}
- msgID, _ := a.GetMsgID(reply.GetMessageName(), reply.GetCrcString())
+ msgID, _ := a.GetMsgID(reply.msg.GetMessageName(), reply.msg.GetCrcString())
buf := new(bytes.Buffer)
- if reply.GetMessageType() == api.ReplyMessage {
- struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: clientID})
- } else if reply.GetMessageType() == api.EventMessage {
- struc.Pack(buf, &core.VppEventHeader{VlMsgID: msgID, Context: clientID})
- } else if reply.GetMessageType() == api.RequestMessage {
- struc.Pack(buf, &core.VppRequestHeader{VlMsgID: msgID, Context: clientID})
+ context := clientID
+ context = setMultipart(context, reply.multipart)
+ if reply.hasSeqNum {
+ withSeqNums = true
+ context = setSeqNum(context, reply.seqNum)
+ }
+ if reply.msg.GetMessageType() == api.ReplyMessage {
+ struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: context})
+ } else if reply.msg.GetMessageType() == api.EventMessage {
+ struc.Pack(buf, &core.VppEventHeader{VlMsgID: msgID, Context: context})
+ } else if reply.msg.GetMessageType() == api.RequestMessage {
+ struc.Pack(buf, &core.VppRequestHeader{VlMsgID: msgID, Context: context})
} else {
struc.Pack(buf, &core.VppOtherHeader{VlMsgID: msgID})
}
- struc.Pack(buf, reply)
- a.callback(clientID, msgID, buf.Bytes())
+ struc.Pack(buf, reply.msg)
+ a.callback(context, msgID, buf.Bytes())
}
if len(a.replies) > 0 {
- a.replies = []api.Message{}
+ a.replies = []reply{}
if len(a.replyHandlers) > 0 {
// Switch back to handlers once the queue is empty to revert back
// the fallthrough effect.
@@ -302,11 +316,21 @@ func (a *VppAdapter) WaitReady() error {
// MockReply stores a message to be returned when the next request comes. It is a FIFO queue - multiple replies
// can be pushed into it, the first one will be popped when some request comes.
// Using of this method automatically switches the mock into th useRepliesQueue mode.
-func (a *VppAdapter) MockReply(msg api.Message) {
+func (a *VppAdapter) MockReply(replyMsg api.Message, isMultipart bool) {
a.repliesLock.Lock()
defer a.repliesLock.Unlock()
- a.replies = append(a.replies, msg)
+ a.replies = append(a.replies, reply{msg: replyMsg, multipart: isMultipart, hasSeqNum: false})
+ a.mode = useRepliesQueue
+}
+
+// MockReplyWithSeqNum queues next reply like MockReply() does, except that the
+// sequence number can be customized and not necessarily match with the request.
+func (a *VppAdapter) MockReplyWithSeqNum(replyMsg api.Message, isMultipart bool, sequenceNum uint16) {
+ a.repliesLock.Lock()
+ defer a.repliesLock.Unlock()
+
+ a.replies = append(a.replies, reply{msg: replyMsg, multipart: isMultipart, hasSeqNum: true, seqNum: sequenceNum})
a.mode = useRepliesQueue
}
@@ -319,3 +343,18 @@ func (a *VppAdapter) MockReplyHandler(replyHandler ReplyHandler) {
a.replyHandlers = append(a.replyHandlers, replyHandler)
a.mode = useReplyHandlers
}
+
+func setSeqNum(context uint32, seqNum uint16) (newContext uint32) {
+ context &= 0xffff0000
+ context |= uint32(seqNum)
+ return context
+}
+
+func setMultipart(context uint32, isMultipart bool) (newContext uint32) {
+ context &= 0xfffeffff
+ if isMultipart {
+ context |= 1 << 16
+ }
+ return context
+}
+
diff --git a/adapter/vppapiclient/vppapiclient_adapter.go b/adapter/vppapiclient/vppapiclient_adapter.go
index 5a05a58..c77d7f1 100644
--- a/adapter/vppapiclient/vppapiclient_adapter.go
+++ b/adapter/vppapiclient/vppapiclient_adapter.go
@@ -145,8 +145,8 @@ func (a *vppAPIClientAdapter) GetMsgID(msgName string, msgCrc string) (uint16, e
}
// SendMsg sends a binary-encoded message to VPP.
-func (a *vppAPIClientAdapter) SendMsg(clientID uint32, data []byte) error {
- rc := C.govpp_send(C.uint32_t(clientID), unsafe.Pointer(&data[0]), C.size_t(len(data)))
+func (a *vppAPIClientAdapter) SendMsg(context uint32, data []byte) error {
+ rc := C.govpp_send(C.uint32_t(context), unsafe.Pointer(&data[0]), C.size_t(len(data)))
if rc != 0 {
return fmt.Errorf("unable to send the message (error=%d)", rc)
}
diff --git a/api/api.go b/api/api.go
index eafdf22..3c2c7ec 100644
--- a/api/api.go
+++ b/api/api.go
@@ -87,6 +87,8 @@ type MessageIdentifier interface {
// methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently,
// otherwise the responses could mix! Use multiple channels instead.
type Channel struct {
+ ID uint16 // channel ID
+
ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
ReplyChan chan *VppReply // channel where VPP replies are delivered to
@@ -96,13 +98,15 @@ type Channel struct {
MsgDecoder MessageDecoder // used to decode binary data to generated API messages
MsgIdentifier MessageIdentifier // used to retrieve message ID of a message
+ lastSeqNum uint16 // sequence number of the last sent request
+
+ delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery
replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
- metadata interface{} // opaque metadata of the API channel
- lastTimedOut bool // wether last request timed out
}
// VppRequest is a request that will be sent to VPP.
type VppRequest struct {
+ SeqNum uint16 // sequence number
Message Message // binary API message to be send to VPP
Multipart bool // true if multipart response is expected, false otherwise
}
@@ -110,6 +114,7 @@ type VppRequest struct {
// VppReply is a reply received from VPP.
type VppReply struct {
MessageID uint16 // ID of the message
+ SeqNum uint16 // sequence number
Data []byte // encoded data with the message - MessageDecoder can be used for decoding
LastReplyReceived bool // in case of multipart replies, true if the last reply has been already received and this one should be ignored
Error error // in case of error, data is nil and this member contains error description
@@ -130,30 +135,27 @@ type NotifSubscription struct {
// RequestCtx is a context of a ongoing request (simple one - only one response is expected).
type RequestCtx struct {
ch *Channel
+ seqNum uint16
}
// MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected).
type MultiRequestCtx struct {
ch *Channel
+ seqNum uint16
}
const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
-// NewChannelInternal returns a new channel structure with metadata field filled in with the provided argument.
+// NewChannelInternal returns a new channel structure.
// Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly.
// Use ChannelProvider to get an API channel ready for communication with VPP.
-func NewChannelInternal(metadata interface{}) *Channel {
+func NewChannelInternal(id uint16) *Channel {
return &Channel{
+ ID: id,
replyTimeout: defaultReplyTimeout,
- metadata: metadata,
}
}
-// Metadata returns the metadata stored within the channel structure by the NewChannelInternal call.
-func (ch *Channel) Metadata() interface{} {
- return ch.metadata
-}
-
// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
// from VPP before returning an error.
func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
@@ -170,10 +172,12 @@ func (ch *Channel) Close() {
// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
func (ch *Channel) SendRequest(msg Message) *RequestCtx {
+ ch.lastSeqNum++
ch.ReqChan <- &VppRequest{
Message: msg,
+ SeqNum: ch.lastSeqNum,
}
- return &RequestCtx{ch: ch}
+ return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum}
}
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
@@ -183,7 +187,7 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
return errors.New("invalid request context")
}
- lastReplyReceived, err := req.ch.receiveReplyInternal(msg)
+ lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
if lastReplyReceived {
err = errors.New("multipart reply recieved while a simple reply expected")
@@ -195,79 +199,56 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
// Returns a multipart request context, that can be used to call ReceiveReply.
// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx {
+ ch.lastSeqNum++
ch.ReqChan <- &VppRequest{
Message: msg,
Multipart: true,
+ SeqNum: ch.lastSeqNum,
}
- return &MultiRequestCtx{ch: ch}
+ return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum}
}
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
-// The reply will be decoded into the msg argument. If the last reply has been already consumed, LastReplyReceived is
-// set to true. Do not use the message itself if LastReplyReceived is true - it won't be filled with actual data.
+// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
+// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data.
// Error will be returned if the response cannot be received or decoded.
-func (req *MultiRequestCtx) ReceiveReply(msg Message) (LastReplyReceived bool, err error) {
+func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) {
if req == nil || req.ch == nil {
return false, errors.New("invalid request context")
}
- return req.ch.receiveReplyInternal(msg)
+ return req.ch.receiveReplyInternal(msg, req.seqNum)
}
// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
-func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, err error) {
+func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
+ var ignore bool
if msg == nil {
return false, errors.New("nil message passed in")
}
+ if ch.delayedReply != nil {
+ // try the delayed reply
+ vppReply := ch.delayedReply
+ ch.delayedReply = nil
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if !ignore {
+ return lastReplyReceived, err
+ }
+ }
+
timer := time.NewTimer(ch.replyTimeout)
for {
select {
// blocks until a reply comes to ReplyChan or until timeout expires
case vppReply := <-ch.ReplyChan:
- if vppReply.Error != nil {
- err = vppReply.Error
- return
- }
- if vppReply.LastReplyReceived {
- LastReplyReceived = true
- return
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if ignore {
+ continue
}
-
- // message checks
- expMsgID, err := ch.MsgIdentifier.GetMessageID(msg)
- if err != nil {
- err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
- msg.GetMessageName(), msg.GetCrcString())
- return false, err
- }
-
- if vppReply.MessageID != expMsgID {
- var msgNameCrc string
- if nameCrc, err := ch.MsgIdentifier.LookupByID(vppReply.MessageID); err != nil {
- msgNameCrc = err.Error()
- } else {
- msgNameCrc = nameCrc
- }
-
- if ch.lastTimedOut {
- logrus.Warnf("received invalid message ID, expected %d (%s), but got %d (%s) (probably timed out reply from previous request)",
- expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc)
- continue
- }
-
- err = fmt.Errorf("received invalid message ID, expected %d (%s), but got %d (%s) (check if multiple goroutines are not sharing single GoVPP channel)",
- expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc)
- return false, err
- }
-
- ch.lastTimedOut = false
- // decode the message
- err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg)
- return false, err
+ return lastReplyReceived, err
case <-timer.C:
- ch.lastTimedOut = true
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
return false, err
}
@@ -275,6 +256,80 @@ func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, er
return
}
+func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) {
+ // check the sequence number
+ cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
+ if cmpSeqNums == -1 {
+ // reply received too late, ignore the message
+ logrus.WithField("sequence-number", reply.SeqNum).Warn(
+ "Received reply to an already closed binary API request")
+ ignore = true
+ return
+ }
+ if cmpSeqNums == 1 {
+ ch.delayedReply = reply
+ err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
+ return
+ }
+
+ if reply.Error != nil {
+ err = reply.Error
+ return
+ }
+ if reply.LastReplyReceived {
+ lastReplyReceived = true
+ return
+ }
+
+ // message checks
+ var expMsgID uint16
+ expMsgID, err = ch.MsgIdentifier.GetMessageID(msg)
+ if err != nil {
+ err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
+ msg.GetMessageName(), msg.GetCrcString())
+ return
+ }
+
+ if reply.MessageID != expMsgID {
+ var msgNameCrc string
+ if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil {
+ msgNameCrc = err.Error()
+ } else {
+ msgNameCrc = nameCrc
+ }
+
+ err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) " +
+ "(check if multiple goroutines are not sharing single GoVPP channel)",
+ reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
+ return
+ }
+
+ // decode the message
+ err = ch.MsgDecoder.DecodeMsg(reply.Data, msg)
+ return
+}
+
+// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
+// or succeeds seq. number <seqNum2>.
+// Since sequence numbers cycle in the finite set of size 2^16, the function
+// must assume that the distance between compared sequence numbers is less than
+// (2^16)/2 to determine the order.
+func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
+ // calculate distance from seqNum1 to seqNum2
+ var dist uint16
+ if seqNum1 <= seqNum2 {
+ dist = seqNum2 - seqNum1
+ } else {
+ dist = 0xffff - (seqNum1 - seqNum2 - 1)
+ }
+ if dist == 0 {
+ return 0
+ } else if dist <= 0x8000 {
+ return -1
+ }
+ return 1
+}
+
// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
// buffer is full, the notifications will not be delivered into it.
diff --git a/api/api_test.go b/api/api_test.go
index a439986..a83b1cc 100644
--- a/api/api_test.go
+++ b/api/api_test.go
@@ -64,7 +64,7 @@ func TestRequestReplyTapConnect(t *testing.T) {
ctx.mockVpp.MockReply(&tap.TapConnectReply{
Retval: 10,
SwIfIndex: 1,
- })
+ }, false)
request := &tap.TapConnect{
TapName: []byte("test-tap-name"),
UseRandomMac: 1,
@@ -84,7 +84,7 @@ func TestRequestReplyTapModify(t *testing.T) {
ctx.mockVpp.MockReply(&tap.TapModifyReply{
Retval: 15,
SwIfIndex: 2,
- })
+ }, false)
request := &tap.TapModify{
TapName: []byte("test-tap-modify"),
UseRandomMac: 1,
@@ -104,7 +104,7 @@ func TestRequestReplyTapDelete(t *testing.T) {
ctx.mockVpp.MockReply(&tap.TapDeleteReply{
Retval: 20,
- })
+ }, false)
request := &tap.TapDelete{
SwIfIndex: 3,
}
@@ -123,7 +123,7 @@ func TestRequestReplySwInterfaceTapDump(t *testing.T) {
ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{
SwIfIndex: 25,
DevName: byteName,
- })
+ }, false)
request := &tap.SwInterfaceTapDump{}
reply := &tap.SwInterfaceTapDetails{}
@@ -140,7 +140,7 @@ func TestRequestReplyMemifCreate(t *testing.T) {
ctx.mockVpp.MockReply(&memif.MemifCreateReply{
Retval: 22,
SwIfIndex: 4,
- })
+ }, false)
request := &memif.MemifCreate{
Role: 10,
ID: 12,
@@ -161,7 +161,7 @@ func TestRequestReplyMemifDelete(t *testing.T) {
ctx.mockVpp.MockReply(&memif.MemifDeleteReply{
Retval: 24,
- })
+ }, false)
request := &memif.MemifDelete{
SwIfIndex: 15,
}
@@ -180,7 +180,7 @@ func TestRequestReplyMemifDetails(t *testing.T) {
SwIfIndex: 25,
IfName: []byte("memif-name"),
Role: 0,
- })
+ }, false)
request := &memif.MemifDump{}
reply := &memif.MemifDetails{}
@@ -200,9 +200,9 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) {
ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{
SwIfIndex: uint32(i),
DevName: []byte("dev-name-test"),
- })
+ }, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
reqCtx := ctx.ch.SendMultiRequest(&tap.SwInterfaceTapDump{})
cnt := 0
@@ -226,9 +226,9 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) {
for i := 1; i <= 10; i++ {
ctx.mockVpp.MockReply(&memif.MemifDetails{
SwIfIndex: uint32(i),
- })
+ }, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
reqCtx := ctx.ch.SendMultiRequest(&memif.MemifDump{})
cnt := 0
@@ -257,7 +257,7 @@ func TestNotifications(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
SwIfIndex: 3,
AdminUpDown: 1,
- })
+ }, false)
ctx.mockVpp.SendMsg(0, []byte(""))
// receive the notification
@@ -292,7 +292,7 @@ func TestNotificationEvent(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{
SwIfIndex: 2,
LinkUpDown: 1,
- })
+ }, false)
ctx.mockVpp.SendMsg(0, []byte(""))
// receive the notification
@@ -328,7 +328,7 @@ func TestSetReplyTimeout(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond)
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false)
err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -348,9 +348,9 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
cnt := 0
sendMultiRequest := func() error {
@@ -410,19 +410,19 @@ func TestMultiRequestDouble(t *testing.T) {
// mock reply
for i := 1; i <= 3; i++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
+ ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true, 1)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 1)
for i := 1; i <= 3; i++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
+ ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true, 2)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true,2)
cnt := 0
var sendMultiRequest = func() error {
@@ -457,7 +457,7 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond)
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,1)
err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -466,10 +466,10 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
Expect(err.Error()).To(ContainSubstring("timeout"))
// simulating late reply
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,2)
// normal reply for next request
- ctx.mockVpp.MockReply(&tap.TapConnectReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false,3)
req := &tap.TapConnect{
TapName: []byte("test-tap-name"),
@@ -497,7 +497,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond * 100)
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false, 1)
err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -525,15 +525,15 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
// simulating late replies
for i := 1; i <= 3; i++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
+ ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true, 2)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 2)
// normal reply for next request
- ctx.mockVpp.MockReply(&tap.TapConnectReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false, 3)
req := &tap.TapConnect{
TapName: []byte("test-tap-name"),
@@ -551,12 +551,12 @@ func TestInvalidMessageID(t *testing.T) {
defer ctx.teardownTest()
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+ ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false)
err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{})
Expect(err).ShouldNot(HaveOccurred())
// second should fail with error invalid message ID
- ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+ ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false)
err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("invalid message ID"))
diff --git a/core/core.go b/core/core.go
index 4782ba1..052eb0b 100644
--- a/core/core.go
+++ b/core/core.go
@@ -77,12 +77,12 @@ type Connection struct {
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
channelsLock sync.RWMutex // lock for the channels map
- channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
+ channels map[uint16]*api.Channel // map of all API channels indexed by the channel ID
notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
- maxChannelID uint32 // maximum used client ID
+ maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
pingReqID uint16 // ID if the ControlPing message
pingReplyID uint16 // ID of the ControlPingReply message
@@ -90,12 +90,6 @@ type Connection struct {
lastReply time.Time // time of the last received reply from VPP
}
-// channelMetadata contains core-local metadata of an API channel.
-type channelMetadata struct {
- id uint32 // channel ID
- multipart uint32 // 1 if multipart request is being processed, 0 otherwise
-}
-
var (
log *logger.Logger // global logger
conn *Connection // global handle to the Connection (used in the message receive callback)
@@ -204,7 +198,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
conn = &Connection{
vpp: vppAdapter,
codec: &MsgCodec{},
- channels: make(map[uint32]*api.Channel),
+ channels: make(map[uint16]*api.Channel),
msgIDs: make(map[string]uint16),
notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
}
@@ -370,10 +364,9 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
if c == nil {
return nil, errors.New("nil connection passed in")
}
- chID := atomic.AddUint32(&c.maxChannelID, 1)
- chMeta := &channelMetadata{id: chID}
- ch := api.NewChannelInternal(chMeta)
+ chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+ ch := api.NewChannelInternal(chID)
ch.MsgDecoder = c.codec
ch.MsgIdentifier = c
@@ -389,19 +382,19 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
c.channelsLock.Unlock()
// start watching on the request channel
- go c.watchRequests(ch, chMeta)
+ go c.watchRequests(ch)
return ch, nil
}
// releaseAPIChannel releases API channel that needs to be closed.
-func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
+func (c *Connection) releaseAPIChannel(ch *api.Channel) {
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "ID": ch.ID,
}).Debug("API channel closed.")
// delete the channel from channels map
c.channelsLock.Lock()
- delete(c.channels, chMeta.id)
+ delete(c.channels, ch.ID)
c.channelsLock.Unlock()
}
diff --git a/core/core_test.go b/core/core_test.go
index e80f97c..981ff19 100644
--- a/core/core_test.go
+++ b/core/core_test.go
@@ -33,7 +33,7 @@ type testCtx struct {
ch *api.Channel
}
-func setupTest(t *testing.T) *testCtx {
+func setupTest(t *testing.T, bufferedChan bool) *testCtx {
RegisterTestingT(t)
ctx := &testCtx{}
@@ -43,7 +43,11 @@ func setupTest(t *testing.T) *testCtx {
ctx.conn, err = core.Connect(ctx.mockVpp)
Expect(err).ShouldNot(HaveOccurred())
- ctx.ch, err = ctx.conn.NewAPIChannel()
+ if bufferedChan {
+ ctx.ch, err = ctx.conn.NewAPIChannelBuffered(100, 100)
+ } else {
+ ctx.ch, err = ctx.conn.NewAPIChannel()
+ }
Expect(err).ShouldNot(HaveOccurred())
return ctx
@@ -55,10 +59,10 @@ func (ctx *testCtx) teardownTest() {
}
func TestSimpleRequest(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}, false)
req := &vpe.ControlPing{}
reply := &vpe.ControlPingReply{}
@@ -78,13 +82,13 @@ func TestSimpleRequest(t *testing.T) {
}
func TestMultiRequest(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
for m := 0; m < 10; m++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{})
+ ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{}, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
// send multipart request
ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true}
@@ -109,7 +113,7 @@ func TestMultiRequest(t *testing.T) {
}
func TestNotifications(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
// subscribe for notification
@@ -129,7 +133,7 @@ func TestNotifications(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
SwIfIndex: 3,
AdminUpDown: 1,
- })
+ }, false)
ctx.mockVpp.SendMsg(0, []byte{0})
// receive the notification
@@ -162,7 +166,7 @@ func TestNilConnection(t *testing.T) {
}
func TestDoubleConnection(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
conn, err := core.Connect(ctx.mockVpp)
@@ -172,7 +176,7 @@ func TestDoubleConnection(t *testing.T) {
}
func TestAsyncConnection(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
ctx.conn.Disconnect()
@@ -187,7 +191,7 @@ func TestAsyncConnection(t *testing.T) {
}
func TestFullBuffer(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
// close the default API channel
@@ -200,7 +204,7 @@ func TestFullBuffer(t *testing.T) {
// send multiple requests, only one reply should be read
for i := 0; i < 20; i++ {
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false)
ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
}
@@ -274,3 +278,225 @@ func TestCodecNegative(t *testing.T) {
Expect(err).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("EOF"))
}
+
+func TestSimpleRequestsWithSequenceNumbers(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ var reqCtx []*api.RequestCtx
+ for i := 0; i < 10; i++ {
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}, false)
+ req := &vpe.ControlPing{}
+ reqCtx = append(reqCtx, ctx.ch.SendRequest(req))
+ }
+
+ for i := 0; i < 10; i++ {
+ reply := &vpe.ControlPingReply{}
+ err := reqCtx[i].ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.Retval).To(BeEquivalentTo(i))
+ }
+}
+
+func TestMultiRequestsWithSequenceNumbers(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ for i := 0; i < 10; i++ {
+ ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true)
+ }
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
+
+ // send multipart request
+ reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{})
+
+ cnt := 0
+ for {
+ Expect(cnt < 11).To(BeTrue())
+
+ // receive a reply
+ reply := &interfaces.SwInterfaceDetails{}
+ lastReplyReceived, err := reqCtx.ReceiveReply(reply)
+
+ if lastReplyReceived {
+ break // break out of the loop
+ }
+
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(cnt))
+
+ cnt++
+ }
+
+ Expect(cnt).To(BeEquivalentTo(10))
+}
+
+func TestSimpleRequestWithTimeout(t *testing.T) {
+ ctx := setupTest(t, true)
+ defer ctx.teardownTest()
+
+ // reply for a previous timeouted requests to be ignored
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0)
+
+ // send reply later
+ req1 := &vpe.ControlPing{}
+ reqCtx1 := ctx.ch.SendRequest(req1)
+
+ reply := &vpe.ControlPingReply{}
+ err := reqCtx1.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(HavePrefix("no reply received within the timeout period"))
+
+ // reply for the previous request
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,1)
+
+ // next request
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false)
+ req2 := &vpe.ControlPing{}
+ reqCtx2 := ctx.ch.SendRequest(req2)
+
+ // second request should ignore the first reply and return the second one
+ reply = &vpe.ControlPingReply{}
+ err = reqCtx2.ReceiveReply(reply)
+ Expect(err).To(BeNil())
+ Expect(reply.Retval).To(BeEquivalentTo(2))
+}
+
+func TestSimpleRequestsWithMissingReply(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ // request without reply
+ req1 := &vpe.ControlPing{}
+ reqCtx1 := ctx.ch.SendRequest(req1)
+
+ // another request without reply
+ req2 := &vpe.ControlPing{}
+ reqCtx2 := ctx.ch.SendRequest(req2)
+
+ // third request with reply
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 3}, false, 3)
+ req3 := &vpe.ControlPing{}
+ reqCtx3 := ctx.ch.SendRequest(req3)
+
+ // the first two should fail, but not consume reply for the 3rd
+ reply := &vpe.ControlPingReply{}
+ err := reqCtx1.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1"))
+
+ reply = &vpe.ControlPingReply{}
+ err = reqCtx2.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 2"))
+
+ // the second request should succeed
+ reply = &vpe.ControlPingReply{}
+ err = reqCtx3.ReceiveReply(reply)
+ Expect(err).To(BeNil())
+ Expect(reply.Retval).To(BeEquivalentTo(3))
+}
+
+func TestMultiRequestsWithErrors(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ // reply for a previous timeouted requests to be ignored
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff - 1)
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff)
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0)
+
+ for i := 0; i < 10; i++ {
+ ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true)
+ }
+ // missing finalizing control ping
+
+ // reply for a next request
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 2}, false,2)
+
+ // send multipart request
+ reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{})
+
+ for i := 0; i < 10; i++ {
+ // receive multi-part replies
+ reply := &interfaces.SwInterfaceDetails{}
+ lastReplyReceived, err := reqCtx.ReceiveReply(reply)
+
+ Expect(lastReplyReceived).To(BeFalse())
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(i))
+ }
+
+ // missing closing control ping
+ reply := &interfaces.SwInterfaceDetails{}
+ _, err := reqCtx.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1"))
+
+ // try again - still fails and nothing consumed
+ _, err = reqCtx.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1"))
+
+ // reply for the second request has not been consumed
+ reqCtx2 := ctx.ch.SendRequest(&vpe.ControlPing{})
+ reply2 := &vpe.ControlPingReply{}
+ err = reqCtx2.ReceiveReply(reply2)
+ Expect(err).To(BeNil())
+ Expect(reply2.Retval).To(BeEquivalentTo(2))
+}
+
+func TestRequestsOrdering(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ // the orderings of SendRequest and ReceiveReply calls should match, otherwise
+ // some replies will get thrown away
+
+ // first request
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1}, false)
+ req1 := &vpe.ControlPing{}
+ reqCtx1 := ctx.ch.SendRequest(req1)
+
+ // second request
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false)
+ req2 := &vpe.ControlPing{}
+ reqCtx2 := ctx.ch.SendRequest(req2)
+
+ // if reply for the second request is read first, the reply for the first
+ // request gets thrown away.
+ reply2 := &vpe.ControlPingReply{}
+ err := reqCtx2.ReceiveReply(reply2)
+ Expect(err).To(BeNil())
+ Expect(reply2.Retval).To(BeEquivalentTo(2))
+
+ // first request has already been considered closed
+ reply1 := &vpe.ControlPingReply{}
+ err = reqCtx1.ReceiveReply(reply1)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(HavePrefix("no reply received within the timeout period"))
+}
+
+func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
+ ctx := setupTest(t, true)
+ defer ctx.teardownTest()
+
+ numIters := 0xffff + 100
+ reqCtx := make(map[int]*api.RequestCtx)
+
+ var seqNum uint16 = 0
+ for i := 0; i < numIters + 30 /* receiver is 30 reqs behind */; i++ {
+ seqNum++
+ if i < numIters {
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: int32(i)}, false, seqNum)
+ req := &vpe.ControlPing{}
+ reqCtx[i] = ctx.ch.SendRequest(req)
+ }
+ if i > 30 {
+ reply := &vpe.ControlPingReply{}
+ err := reqCtx[i-30].ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.Retval).To(BeEquivalentTo(i-30))
+ }
+ }
+} \ No newline at end of file
diff --git a/core/request_handler.go b/core/request_handler.go
index 8f793f5..3bec38d 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -31,17 +31,17 @@ var (
)
// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
+func (c *Connection) watchRequests(ch *api.Channel) {
for {
select {
case req, ok := <-ch.ReqChan:
// new request on the request channel
if !ok {
// after closing the request channel, release API channel and return
- c.releaseAPIChannel(ch, chMeta)
+ c.releaseAPIChannel(ch)
return
}
- c.processRequest(ch, chMeta, req)
+ c.processRequest(ch, req)
case req := <-ch.NotifSubsChan:
// new request on the notification subscribe channel
@@ -51,12 +51,12 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
}
// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
+func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error {
// check whether we are connected to VPP
if atomic.LoadUint32(&c.connected) == 0 {
err := ErrNotConnected
log.Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -67,8 +67,9 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
log.WithFields(logger.Fields{
"msg_name": req.Message.GetMessageName(),
"msg_crc": req.Message.GetCrcString(),
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -77,37 +78,35 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
if err != nil {
err = fmt.Errorf("unable to encode the messge: %v", err)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "channel": ch.ID,
"msg_id": msgID,
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "channel": ch.ID,
"msg_id": msgID,
"msg_size": len(data),
"msg_name": req.Message.GetMessageName(),
+ "seq_num": req.SeqNum,
}).Debug("Sending a message to VPP.")
}
- // send the message
- if req.Multipart {
- // expect multipart response
- atomic.StoreUint32(&chMeta.multipart, 1)
- }
-
// send the request to VPP
- err = c.vpp.SendMsg(chMeta.id, data)
+ context := packRequestContext(ch.ID, req.Multipart, req.SeqNum)
+ err = c.vpp.SendMsg(context, data)
if err != nil {
- err = fmt.Errorf("unable to send the messge: %v", err)
+ err = fmt.Errorf("unable to send the message: %v", err)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "context": context,
"msg_id": msgID,
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -116,12 +115,13 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "context": context,
"msg_id": c.pingReqID,
"msg_size": len(pingData),
+ "seq_num": req.SeqNum,
}).Debug("Sending a control ping to VPP.")
- c.vpp.SendMsg(chMeta.id, pingData)
+ c.vpp.SendMsg(context, pingData)
}
return nil
@@ -137,11 +137,14 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
return
}
+ chanID, isMultipart, seqNum := unpackRequestContext(context)
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- "msg_size": len(data),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ "channel_id": chanID,
+ "is_multipart": isMultipart,
+ "seq_num": seqNum,
}).Debug("Received a message from VPP.")
}
@@ -153,27 +156,27 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
// match ch according to the context
conn.channelsLock.RLock()
- ch, ok := conn.channels[context]
+ ch, ok := conn.channels[chanID]
conn.channelsLock.RUnlock()
if !ok {
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- }).Error("Context ID not known, ignoring the message.")
+ "channel_id": chanID,
+ "msg_id": msgID,
+ }).Error("Channel ID not known, ignoring the message.")
return
}
- chMeta := ch.Metadata().(*channelMetadata)
lastReplyReceived := false
- // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
- if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
+ // if this is a control ping reply to a multipart request, treat this as a last part of the reply
+ if msgID == conn.pingReplyID && isMultipart {
lastReplyReceived = true
}
// send the data to the channel
sendReply(ch, &api.VppReply{
MessageID: msgID,
+ SeqNum: seqNum,
Data: data,
LastReplyReceived: lastReplyReceived,
})
@@ -195,6 +198,7 @@ func sendReply(ch *api.Channel, reply *api.VppReply) {
log.WithFields(logger.Fields{
"channel": ch,
"msg_id": reply.MessageID,
+ "seq_num": reply.SeqNum,
}).Warn("Unable to send the reply, reciever end not ready.")
}
}
@@ -254,3 +258,24 @@ func (c *Connection) LookupByID(ID uint16) (string, error) {
return "", fmt.Errorf("unknown message ID: %d", ID)
}
+
+// +------------------+-------------------+-----------------------+
+// | 15b = channel ID | 1b = is multipart | 16b = sequence number |
+// +------------------+-------------------+-----------------------+
+func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
+ context := uint32(chanID) << 17
+ if isMultipart {
+ context |= 1 << 16
+ }
+ context |= uint32(seqNum)
+ return context
+}
+
+func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
+ chanID = uint16(context >> 17)
+ if ((context >> 16) & 0x1) != 0 {
+ isMulipart = true
+ }
+ seqNum = uint16(context & 0xffff)
+ return
+}