aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--adapter/adapter.go2
-rw-r--r--adapter/mock/mock_adapter.go73
-rw-r--r--adapter/vppapiclient/vppapiclient_adapter.go4
-rw-r--r--api/api.go171
-rw-r--r--api/api_test.go64
-rw-r--r--core/core.go25
-rw-r--r--core/core_test.go252
-rw-r--r--core/request_handler.go87
8 files changed, 508 insertions, 170 deletions
diff --git a/adapter/adapter.go b/adapter/adapter.go
index a5b3352..bc3a573 100644
--- a/adapter/adapter.go
+++ b/adapter/adapter.go
@@ -26,7 +26,7 @@ type VppAdapter interface {
GetMsgID(msgName string, msgCrc string) (uint16, error)
// SendMsg sends a binary-encoded message to VPP.
- SendMsg(clientID uint32, data []byte) error
+ SendMsg(context uint32, data []byte) error
// SetMsgCallback sets a callback function that will be called by the adapter whenever a message comes from VPP.
SetMsgCallback(func(context uint32, msgId uint16, data []byte))
diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go
index dab51a6..ae4caf0 100644
--- a/adapter/mock/mock_adapter.go
+++ b/adapter/mock/mock_adapter.go
@@ -48,10 +48,10 @@ type VppAdapter struct {
binAPITypes map[string]reflect.Type
access sync.RWMutex
- replies []api.Message // FIFO queue of messages
- replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses
- repliesLock sync.Mutex // mutex for the queue
- mode replyMode // mode in which the mock operates
+ replies []reply // FIFO queue of messages
+ replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses
+ repliesLock sync.Mutex // mutex for the queue
+ mode replyMode // mode in which the mock operates
}
// defaultReply is a default reply message that mock adapter returns for a request.
@@ -67,6 +67,13 @@ type MessageDTO struct {
Data []byte
}
+type reply struct {
+ msg api.Message
+ multipart bool
+ hasSeqNum bool
+ seqNum uint16
+}
+
// ReplyHandler is a type that allows to extend the behaviour of VPP mock.
// Return value ok is used to signalize that mock reply is calculated and ready to be used.
type ReplyHandler func(request MessageDTO) (reply []byte, msgID uint16, ok bool)
@@ -247,28 +254,35 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
defer a.repliesLock.Unlock()
// pop all replies from queue
+ withSeqNums := false
for i, reply := range a.replies {
- if i > 0 && reply.GetMessageName() == "control_ping_reply" {
+ if i > 0 && reply.msg.GetMessageName() == "control_ping_reply" && !withSeqNums {
// hack - do not send control_ping_reply immediately, leave it for the the next callback
a.replies = a.replies[i:]
return nil
}
- msgID, _ := a.GetMsgID(reply.GetMessageName(), reply.GetCrcString())
+ msgID, _ := a.GetMsgID(reply.msg.GetMessageName(), reply.msg.GetCrcString())
buf := new(bytes.Buffer)
- if reply.GetMessageType() == api.ReplyMessage {
- struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: clientID})
- } else if reply.GetMessageType() == api.EventMessage {
- struc.Pack(buf, &core.VppEventHeader{VlMsgID: msgID, Context: clientID})
- } else if reply.GetMessageType() == api.RequestMessage {
- struc.Pack(buf, &core.VppRequestHeader{VlMsgID: msgID, Context: clientID})
+ context := clientID
+ context = setMultipart(context, reply.multipart)
+ if reply.hasSeqNum {
+ withSeqNums = true
+ context = setSeqNum(context, reply.seqNum)
+ }
+ if reply.msg.GetMessageType() == api.ReplyMessage {
+ struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: context})
+ } else if reply.msg.GetMessageType() == api.EventMessage {
+ struc.Pack(buf, &core.VppEventHeader{VlMsgID: msgID, Context: context})
+ } else if reply.msg.GetMessageType() == api.RequestMessage {
+ struc.Pack(buf, &core.VppRequestHeader{VlMsgID: msgID, Context: context})
} else {
struc.Pack(buf, &core.VppOtherHeader{VlMsgID: msgID})
}
- struc.Pack(buf, reply)
- a.callback(clientID, msgID, buf.Bytes())
+ struc.Pack(buf, reply.msg)
+ a.callback(context, msgID, buf.Bytes())
}
if len(a.replies) > 0 {
- a.replies = []api.Message{}
+ a.replies = []reply{}
if len(a.replyHandlers) > 0 {
// Switch back to handlers once the queue is empty to revert back
// the fallthrough effect.
@@ -302,11 +316,21 @@ func (a *VppAdapter) WaitReady() error {
// MockReply stores a message to be returned when the next request comes. It is a FIFO queue - multiple replies
// can be pushed into it, the first one will be popped when some request comes.
// Using of this method automatically switches the mock into th useRepliesQueue mode.
-func (a *VppAdapter) MockReply(msg api.Message) {
+func (a *VppAdapter) MockReply(replyMsg api.Message, isMultipart bool) {
a.repliesLock.Lock()
defer a.repliesLock.Unlock()
- a.replies = append(a.replies, msg)
+ a.replies = append(a.replies, reply{msg: replyMsg, multipart: isMultipart, hasSeqNum: false})
+ a.mode = useRepliesQueue
+}
+
+// MockReplyWithSeqNum queues next reply like MockReply() does, except that the
+// sequence number can be customized and not necessarily match with the request.
+func (a *VppAdapter) MockReplyWithSeqNum(replyMsg api.Message, isMultipart bool, sequenceNum uint16) {
+ a.repliesLock.Lock()
+ defer a.repliesLock.Unlock()
+
+ a.replies = append(a.replies, reply{msg: replyMsg, multipart: isMultipart, hasSeqNum: true, seqNum: sequenceNum})
a.mode = useRepliesQueue
}
@@ -319,3 +343,18 @@ func (a *VppAdapter) MockReplyHandler(replyHandler ReplyHandler) {
a.replyHandlers = append(a.replyHandlers, replyHandler)
a.mode = useReplyHandlers
}
+
+func setSeqNum(context uint32, seqNum uint16) (newContext uint32) {
+ context &= 0xffff0000
+ context |= uint32(seqNum)
+ return context
+}
+
+func setMultipart(context uint32, isMultipart bool) (newContext uint32) {
+ context &= 0xfffeffff
+ if isMultipart {
+ context |= 1 << 16
+ }
+ return context
+}
+
diff --git a/adapter/vppapiclient/vppapiclient_adapter.go b/adapter/vppapiclient/vppapiclient_adapter.go
index 5a05a58..c77d7f1 100644
--- a/adapter/vppapiclient/vppapiclient_adapter.go
+++ b/adapter/vppapiclient/vppapiclient_adapter.go
@@ -145,8 +145,8 @@ func (a *vppAPIClientAdapter) GetMsgID(msgName string, msgCrc string) (uint16, e
}
// SendMsg sends a binary-encoded message to VPP.
-func (a *vppAPIClientAdapter) SendMsg(clientID uint32, data []byte) error {
- rc := C.govpp_send(C.uint32_t(clientID), unsafe.Pointer(&data[0]), C.size_t(len(data)))
+func (a *vppAPIClientAdapter) SendMsg(context uint32, data []byte) error {
+ rc := C.govpp_send(C.uint32_t(context), unsafe.Pointer(&data[0]), C.size_t(len(data)))
if rc != 0 {
return fmt.Errorf("unable to send the message (error=%d)", rc)
}
diff --git a/api/api.go b/api/api.go
index eafdf22..3c2c7ec 100644
--- a/api/api.go
+++ b/api/api.go
@@ -87,6 +87,8 @@ type MessageIdentifier interface {
// methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently,
// otherwise the responses could mix! Use multiple channels instead.
type Channel struct {
+ ID uint16 // channel ID
+
ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
ReplyChan chan *VppReply // channel where VPP replies are delivered to
@@ -96,13 +98,15 @@ type Channel struct {
MsgDecoder MessageDecoder // used to decode binary data to generated API messages
MsgIdentifier MessageIdentifier // used to retrieve message ID of a message
+ lastSeqNum uint16 // sequence number of the last sent request
+
+ delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery
replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
- metadata interface{} // opaque metadata of the API channel
- lastTimedOut bool // wether last request timed out
}
// VppRequest is a request that will be sent to VPP.
type VppRequest struct {
+ SeqNum uint16 // sequence number
Message Message // binary API message to be send to VPP
Multipart bool // true if multipart response is expected, false otherwise
}
@@ -110,6 +114,7 @@ type VppRequest struct {
// VppReply is a reply received from VPP.
type VppReply struct {
MessageID uint16 // ID of the message
+ SeqNum uint16 // sequence number
Data []byte // encoded data with the message - MessageDecoder can be used for decoding
LastReplyReceived bool // in case of multipart replies, true if the last reply has been already received and this one should be ignored
Error error // in case of error, data is nil and this member contains error description
@@ -130,30 +135,27 @@ type NotifSubscription struct {
// RequestCtx is a context of a ongoing request (simple one - only one response is expected).
type RequestCtx struct {
ch *Channel
+ seqNum uint16
}
// MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected).
type MultiRequestCtx struct {
ch *Channel
+ seqNum uint16
}
const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
-// NewChannelInternal returns a new channel structure with metadata field filled in with the provided argument.
+// NewChannelInternal returns a new channel structure.
// Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly.
// Use ChannelProvider to get an API channel ready for communication with VPP.
-func NewChannelInternal(metadata interface{}) *Channel {
+func NewChannelInternal(id uint16) *Channel {
return &Channel{
+ ID: id,
replyTimeout: defaultReplyTimeout,
- metadata: metadata,
}
}
-// Metadata returns the metadata stored within the channel structure by the NewChannelInternal call.
-func (ch *Channel) Metadata() interface{} {
- return ch.metadata
-}
-
// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
// from VPP before returning an error.
func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
@@ -170,10 +172,12 @@ func (ch *Channel) Close() {
// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
func (ch *Channel) SendRequest(msg Message) *RequestCtx {
+ ch.lastSeqNum++
ch.ReqChan <- &VppRequest{
Message: msg,
+ SeqNum: ch.lastSeqNum,
}
- return &RequestCtx{ch: ch}
+ return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum}
}
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
@@ -183,7 +187,7 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
return errors.New("invalid request context")
}
- lastReplyReceived, err := req.ch.receiveReplyInternal(msg)
+ lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
if lastReplyReceived {
err = errors.New("multipart reply recieved while a simple reply expected")
@@ -195,79 +199,56 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
// Returns a multipart request context, that can be used to call ReceiveReply.
// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx {
+ ch.lastSeqNum++
ch.ReqChan <- &VppRequest{
Message: msg,
Multipart: true,
+ SeqNum: ch.lastSeqNum,
}
- return &MultiRequestCtx{ch: ch}
+ return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum}
}
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
-// The reply will be decoded into the msg argument. If the last reply has been already consumed, LastReplyReceived is
-// set to true. Do not use the message itself if LastReplyReceived is true - it won't be filled with actual data.
+// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
+// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data.
// Error will be returned if the response cannot be received or decoded.
-func (req *MultiRequestCtx) ReceiveReply(msg Message) (LastReplyReceived bool, err error) {
+func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) {
if req == nil || req.ch == nil {
return false, errors.New("invalid request context")
}
- return req.ch.receiveReplyInternal(msg)
+ return req.ch.receiveReplyInternal(msg, req.seqNum)
}
// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
-func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, err error) {
+func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
+ var ignore bool
if msg == nil {
return false, errors.New("nil message passed in")
}
+ if ch.delayedReply != nil {
+ // try the delayed reply
+ vppReply := ch.delayedReply
+ ch.delayedReply = nil
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if !ignore {
+ return lastReplyReceived, err
+ }
+ }
+
timer := time.NewTimer(ch.replyTimeout)
for {
select {
// blocks until a reply comes to ReplyChan or until timeout expires
case vppReply := <-ch.ReplyChan:
- if vppReply.Error != nil {
- err = vppReply.Error
- return
- }
- if vppReply.LastReplyReceived {
- LastReplyReceived = true
- return
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if ignore {
+ continue
}
-
- // message checks
- expMsgID, err := ch.MsgIdentifier.GetMessageID(msg)
- if err != nil {
- err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
- msg.GetMessageName(), msg.GetCrcString())
- return false, err
- }
-
- if vppReply.MessageID != expMsgID {
- var msgNameCrc string
- if nameCrc, err := ch.MsgIdentifier.LookupByID(vppReply.MessageID); err != nil {
- msgNameCrc = err.Error()
- } else {
- msgNameCrc = nameCrc
- }
-
- if ch.lastTimedOut {
- logrus.Warnf("received invalid message ID, expected %d (%s), but got %d (%s) (probably timed out reply from previous request)",
- expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc)
- continue
- }
-
- err = fmt.Errorf("received invalid message ID, expected %d (%s), but got %d (%s) (check if multiple goroutines are not sharing single GoVPP channel)",
- expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc)
- return false, err
- }
-
- ch.lastTimedOut = false
- // decode the message
- err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg)
- return false, err
+ return lastReplyReceived, err
case <-timer.C:
- ch.lastTimedOut = true
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
return false, err
}
@@ -275,6 +256,80 @@ func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, er
return
}
+func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) {
+ // check the sequence number
+ cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
+ if cmpSeqNums == -1 {
+ // reply received too late, ignore the message
+ logrus.WithField("sequence-number", reply.SeqNum).Warn(
+ "Received reply to an already closed binary API request")
+ ignore = true
+ return
+ }
+ if cmpSeqNums == 1 {
+ ch.delayedReply = reply
+ err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
+ return
+ }
+
+ if reply.Error != nil {
+ err = reply.Error
+ return
+ }
+ if reply.LastReplyReceived {
+ lastReplyReceived = true
+ return
+ }
+
+ // message checks
+ var expMsgID uint16
+ expMsgID, err = ch.MsgIdentifier.GetMessageID(msg)
+ if err != nil {
+ err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
+ msg.GetMessageName(), msg.GetCrcString())
+ return
+ }
+
+ if reply.MessageID != expMsgID {
+ var msgNameCrc string
+ if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil {
+ msgNameCrc = err.Error()
+ } else {
+ msgNameCrc = nameCrc
+ }
+
+ err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) " +
+ "(check if multiple goroutines are not sharing single GoVPP channel)",
+ reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
+ return
+ }
+
+ // decode the message
+ err = ch.MsgDecoder.DecodeMsg(reply.Data, msg)
+ return
+}
+
+// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
+// or succeeds seq. number <seqNum2>.
+// Since sequence numbers cycle in the finite set of size 2^16, the function
+// must assume that the distance between compared sequence numbers is less than
+// (2^16)/2 to determine the order.
+func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
+ // calculate distance from seqNum1 to seqNum2
+ var dist uint16
+ if seqNum1 <= seqNum2 {
+ dist = seqNum2 - seqNum1
+ } else {
+ dist = 0xffff - (seqNum1 - seqNum2 - 1)
+ }
+ if dist == 0 {
+ return 0
+ } else if dist <= 0x8000 {
+ return -1
+ }
+ return 1
+}
+
// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
// buffer is full, the notifications will not be delivered into it.
diff --git a/api/api_test.go b/api/api_test.go
index a439986..a83b1cc 100644
--- a/api/api_test.go
+++ b/api/api_test.go
@@ -64,7 +64,7 @@ func TestRequestReplyTapConnect(t *testing.T) {
ctx.mockVpp.MockReply(&tap.TapConnectReply{
Retval: 10,
SwIfIndex: 1,
- })
+ }, false)
request := &tap.TapConnect{
TapName: []byte("test-tap-name"),
UseRandomMac: 1,
@@ -84,7 +84,7 @@ func TestRequestReplyTapModify(t *testing.T) {
ctx.mockVpp.MockReply(&tap.TapModifyReply{
Retval: 15,
SwIfIndex: 2,
- })
+ }, false)
request := &tap.TapModify{
TapName: []byte("test-tap-modify"),
UseRandomMac: 1,
@@ -104,7 +104,7 @@ func TestRequestReplyTapDelete(t *testing.T) {
ctx.mockVpp.MockReply(&tap.TapDeleteReply{
Retval: 20,
- })
+ }, false)
request := &tap.TapDelete{
SwIfIndex: 3,
}
@@ -123,7 +123,7 @@ func TestRequestReplySwInterfaceTapDump(t *testing.T) {
ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{
SwIfIndex: 25,
DevName: byteName,
- })
+ }, false)
request := &tap.SwInterfaceTapDump{}
reply := &tap.SwInterfaceTapDetails{}
@@ -140,7 +140,7 @@ func TestRequestReplyMemifCreate(t *testing.T) {
ctx.mockVpp.MockReply(&memif.MemifCreateReply{
Retval: 22,
SwIfIndex: 4,
- })
+ }, false)
request := &memif.MemifCreate{
Role: 10,
ID: 12,
@@ -161,7 +161,7 @@ func TestRequestReplyMemifDelete(t *testing.T) {
ctx.mockVpp.MockReply(&memif.MemifDeleteReply{
Retval: 24,
- })
+ }, false)
request := &memif.MemifDelete{
SwIfIndex: 15,
}
@@ -180,7 +180,7 @@ func TestRequestReplyMemifDetails(t *testing.T) {
SwIfIndex: 25,
IfName: []byte("memif-name"),
Role: 0,
- })
+ }, false)
request := &memif.MemifDump{}
reply := &memif.MemifDetails{}
@@ -200,9 +200,9 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) {
ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{
SwIfIndex: uint32(i),
DevName: []byte("dev-name-test"),
- })
+ }, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
reqCtx := ctx.ch.SendMultiRequest(&tap.SwInterfaceTapDump{})
cnt := 0
@@ -226,9 +226,9 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) {
for i := 1; i <= 10; i++ {
ctx.mockVpp.MockReply(&memif.MemifDetails{
SwIfIndex: uint32(i),
- })
+ }, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
reqCtx := ctx.ch.SendMultiRequest(&memif.MemifDump{})
cnt := 0
@@ -257,7 +257,7 @@ func TestNotifications(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
SwIfIndex: 3,
AdminUpDown: 1,
- })
+ }, false)
ctx.mockVpp.SendMsg(0, []byte(""))
// receive the notification
@@ -292,7 +292,7 @@ func TestNotificationEvent(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{
SwIfIndex: 2,
LinkUpDown: 1,
- })
+ }, false)
ctx.mockVpp.SendMsg(0, []byte(""))
// receive the notification
@@ -328,7 +328,7 @@ func TestSetReplyTimeout(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond)
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false)
err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -348,9 +348,9 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
cnt := 0
sendMultiRequest := func() error {
@@ -410,19 +410,19 @@ func TestMultiRequestDouble(t *testing.T) {
// mock reply
for i := 1; i <= 3; i++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
+ ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true, 1)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 1)
for i := 1; i <= 3; i++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
+ ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true, 2)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true,2)
cnt := 0
var sendMultiRequest = func() error {
@@ -457,7 +457,7 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond)
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,1)
err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -466,10 +466,10 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
Expect(err.Error()).To(ContainSubstring("timeout"))
// simulating late reply
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,2)
// normal reply for next request
- ctx.mockVpp.MockReply(&tap.TapConnectReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false,3)
req := &tap.TapConnect{
TapName: []byte("test-tap-name"),
@@ -497,7 +497,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond * 100)
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false, 1)
err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -525,15 +525,15 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
// simulating late replies
for i := 1; i <= 3; i++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
+ ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true, 2)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 2)
// normal reply for next request
- ctx.mockVpp.MockReply(&tap.TapConnectReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false, 3)
req := &tap.TapConnect{
TapName: []byte("test-tap-name"),
@@ -551,12 +551,12 @@ func TestInvalidMessageID(t *testing.T) {
defer ctx.teardownTest()
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+ ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false)
err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{})
Expect(err).ShouldNot(HaveOccurred())
// second should fail with error invalid message ID
- ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+ ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false)
err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("invalid message ID"))
diff --git a/core/core.go b/core/core.go
index 4782ba1..052eb0b 100644
--- a/core/core.go
+++ b/core/core.go
@@ -77,12 +77,12 @@ type Connection struct {
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
channelsLock sync.RWMutex // lock for the channels map
- channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
+ channels map[uint16]*api.Channel // map of all API channels indexed by the channel ID
notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
- maxChannelID uint32 // maximum used client ID
+ maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
pingReqID uint16 // ID if the ControlPing message
pingReplyID uint16 // ID of the ControlPingReply message
@@ -90,12 +90,6 @@ type Connection struct {
lastReply time.Time // time of the last received reply from VPP
}
-// channelMetadata contains core-local metadata of an API channel.
-type channelMetadata struct {
- id uint32 // channel ID
- multipart uint32 // 1 if multipart request is being processed, 0 otherwise
-}
-
var (
log *logger.Logger // global logger
conn *Connection // global handle to the Connection (used in the message receive callback)
@@ -204,7 +198,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
conn = &Connection{
vpp: vppAdapter,
codec: &MsgCodec{},
- channels: make(map[uint32]*api.Channel),
+ channels: make(map[uint16]*api.Channel),
msgIDs: make(map[string]uint16),
notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
}
@@ -370,10 +364,9 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
if c == nil {
return nil, errors.New("nil connection passed in")
}
- chID := atomic.AddUint32(&c.maxChannelID, 1)
- chMeta := &channelMetadata{id: chID}
- ch := api.NewChannelInternal(chMeta)
+ chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+ ch := api.NewChannelInternal(chID)
ch.MsgDecoder = c.codec
ch.MsgIdentifier = c
@@ -389,19 +382,19 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
c.channelsLock.Unlock()
// start watching on the request channel
- go c.watchRequests(ch, chMeta)
+ go c.watchRequests(ch)
return ch, nil
}
// releaseAPIChannel releases API channel that needs to be closed.
-func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
+func (c *Connection) releaseAPIChannel(ch *api.Channel) {
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "ID": ch.ID,
}).Debug("API channel closed.")
// delete the channel from channels map
c.channelsLock.Lock()
- delete(c.channels, chMeta.id)
+ delete(c.channels, ch.ID)
c.channelsLock.Unlock()
}
diff --git a/core/core_test.go b/core/core_test.go
index e80f97c..981ff19 100644
--- a/core/core_test.go
+++ b/core/core_test.go
@@ -33,7 +33,7 @@ type testCtx struct {
ch *api.Channel
}
-func setupTest(t *testing.T) *testCtx {
+func setupTest(t *testing.T, bufferedChan bool) *testCtx {
RegisterTestingT(t)
ctx := &testCtx{}
@@ -43,7 +43,11 @@ func setupTest(t *testing.T) *testCtx {
ctx.conn, err = core.Connect(ctx.mockVpp)
Expect(err).ShouldNot(HaveOccurred())
- ctx.ch, err = ctx.conn.NewAPIChannel()
+ if bufferedChan {
+ ctx.ch, err = ctx.conn.NewAPIChannelBuffered(100, 100)
+ } else {
+ ctx.ch, err = ctx.conn.NewAPIChannel()
+ }
Expect(err).ShouldNot(HaveOccurred())
return ctx
@@ -55,10 +59,10 @@ func (ctx *testCtx) teardownTest() {
}
func TestSimpleRequest(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}, false)
req := &vpe.ControlPing{}
reply := &vpe.ControlPingReply{}
@@ -78,13 +82,13 @@ func TestSimpleRequest(t *testing.T) {
}
func TestMultiRequest(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
for m := 0; m < 10; m++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{})
+ ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{}, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
// send multipart request
ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true}
@@ -109,7 +113,7 @@ func TestMultiRequest(t *testing.T) {
}
func TestNotifications(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
// subscribe for notification
@@ -129,7 +133,7 @@ func TestNotifications(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
SwIfIndex: 3,
AdminUpDown: 1,
- })
+ }, false)
ctx.mockVpp.SendMsg(0, []byte{0})
// receive the notification
@@ -162,7 +166,7 @@ func TestNilConnection(t *testing.T) {
}
func TestDoubleConnection(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
conn, err := core.Connect(ctx.mockVpp)
@@ -172,7 +176,7 @@ func TestDoubleConnection(t *testing.T) {
}
func TestAsyncConnection(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
ctx.conn.Disconnect()
@@ -187,7 +191,7 @@ func TestAsyncConnection(t *testing.T) {
}
func TestFullBuffer(t *testing.T) {
- ctx := setupTest(t)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
// close the default API channel
@@ -200,7 +204,7 @@ func TestFullBuffer(t *testing.T) {
// send multiple requests, only one reply should be read
for i := 0; i < 20; i++ {
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false)
ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
}
@@ -274,3 +278,225 @@ func TestCodecNegative(t *testing.T) {
Expect(err).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("EOF"))
}
+
+func TestSimpleRequestsWithSequenceNumbers(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ var reqCtx []*api.RequestCtx
+ for i := 0; i < 10; i++ {
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}, false)
+ req := &vpe.ControlPing{}
+ reqCtx = append(reqCtx, ctx.ch.SendRequest(req))
+ }
+
+ for i := 0; i < 10; i++ {
+ reply := &vpe.ControlPingReply{}
+ err := reqCtx[i].ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.Retval).To(BeEquivalentTo(i))
+ }
+}
+
+func TestMultiRequestsWithSequenceNumbers(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ for i := 0; i < 10; i++ {
+ ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true)
+ }
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
+
+ // send multipart request
+ reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{})
+
+ cnt := 0
+ for {
+ Expect(cnt < 11).To(BeTrue())
+
+ // receive a reply
+ reply := &interfaces.SwInterfaceDetails{}
+ lastReplyReceived, err := reqCtx.ReceiveReply(reply)
+
+ if lastReplyReceived {
+ break // break out of the loop
+ }
+
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(cnt))
+
+ cnt++
+ }
+
+ Expect(cnt).To(BeEquivalentTo(10))
+}
+
+func TestSimpleRequestWithTimeout(t *testing.T) {
+ ctx := setupTest(t, true)
+ defer ctx.teardownTest()
+
+ // reply for a previous timeouted requests to be ignored
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0)
+
+ // send reply later
+ req1 := &vpe.ControlPing{}
+ reqCtx1 := ctx.ch.SendRequest(req1)
+
+ reply := &vpe.ControlPingReply{}
+ err := reqCtx1.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(HavePrefix("no reply received within the timeout period"))
+
+ // reply for the previous request
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,1)
+
+ // next request
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false)
+ req2 := &vpe.ControlPing{}
+ reqCtx2 := ctx.ch.SendRequest(req2)
+
+ // second request should ignore the first reply and return the second one
+ reply = &vpe.ControlPingReply{}
+ err = reqCtx2.ReceiveReply(reply)
+ Expect(err).To(BeNil())
+ Expect(reply.Retval).To(BeEquivalentTo(2))
+}
+
+func TestSimpleRequestsWithMissingReply(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ // request without reply
+ req1 := &vpe.ControlPing{}
+ reqCtx1 := ctx.ch.SendRequest(req1)
+
+ // another request without reply
+ req2 := &vpe.ControlPing{}
+ reqCtx2 := ctx.ch.SendRequest(req2)
+
+ // third request with reply
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 3}, false, 3)
+ req3 := &vpe.ControlPing{}
+ reqCtx3 := ctx.ch.SendRequest(req3)
+
+ // the first two should fail, but not consume reply for the 3rd
+ reply := &vpe.ControlPingReply{}
+ err := reqCtx1.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1"))
+
+ reply = &vpe.ControlPingReply{}
+ err = reqCtx2.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 2"))
+
+ // the second request should succeed
+ reply = &vpe.ControlPingReply{}
+ err = reqCtx3.ReceiveReply(reply)
+ Expect(err).To(BeNil())
+ Expect(reply.Retval).To(BeEquivalentTo(3))
+}
+
+func TestMultiRequestsWithErrors(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ // reply for a previous timeouted requests to be ignored
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff - 1)
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff)
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0)
+
+ for i := 0; i < 10; i++ {
+ ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true)
+ }
+ // missing finalizing control ping
+
+ // reply for a next request
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 2}, false,2)
+
+ // send multipart request
+ reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{})
+
+ for i := 0; i < 10; i++ {
+ // receive multi-part replies
+ reply := &interfaces.SwInterfaceDetails{}
+ lastReplyReceived, err := reqCtx.ReceiveReply(reply)
+
+ Expect(lastReplyReceived).To(BeFalse())
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(i))
+ }
+
+ // missing closing control ping
+ reply := &interfaces.SwInterfaceDetails{}
+ _, err := reqCtx.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1"))
+
+ // try again - still fails and nothing consumed
+ _, err = reqCtx.ReceiveReply(reply)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1"))
+
+ // reply for the second request has not been consumed
+ reqCtx2 := ctx.ch.SendRequest(&vpe.ControlPing{})
+ reply2 := &vpe.ControlPingReply{}
+ err = reqCtx2.ReceiveReply(reply2)
+ Expect(err).To(BeNil())
+ Expect(reply2.Retval).To(BeEquivalentTo(2))
+}
+
+func TestRequestsOrdering(t *testing.T) {
+ ctx := setupTest(t, false)
+ defer ctx.teardownTest()
+
+ // the orderings of SendRequest and ReceiveReply calls should match, otherwise
+ // some replies will get thrown away
+
+ // first request
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1}, false)
+ req1 := &vpe.ControlPing{}
+ reqCtx1 := ctx.ch.SendRequest(req1)
+
+ // second request
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false)
+ req2 := &vpe.ControlPing{}
+ reqCtx2 := ctx.ch.SendRequest(req2)
+
+ // if reply for the second request is read first, the reply for the first
+ // request gets thrown away.
+ reply2 := &vpe.ControlPingReply{}
+ err := reqCtx2.ReceiveReply(reply2)
+ Expect(err).To(BeNil())
+ Expect(reply2.Retval).To(BeEquivalentTo(2))
+
+ // first request has already been considered closed
+ reply1 := &vpe.ControlPingReply{}
+ err = reqCtx1.ReceiveReply(reply1)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(HavePrefix("no reply received within the timeout period"))
+}
+
+func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
+ ctx := setupTest(t, true)
+ defer ctx.teardownTest()
+
+ numIters := 0xffff + 100
+ reqCtx := make(map[int]*api.RequestCtx)
+
+ var seqNum uint16 = 0
+ for i := 0; i < numIters + 30 /* receiver is 30 reqs behind */; i++ {
+ seqNum++
+ if i < numIters {
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: int32(i)}, false, seqNum)
+ req := &vpe.ControlPing{}
+ reqCtx[i] = ctx.ch.SendRequest(req)
+ }
+ if i > 30 {
+ reply := &vpe.ControlPingReply{}
+ err := reqCtx[i-30].ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.Retval).To(BeEquivalentTo(i-30))
+ }
+ }
+} \ No newline at end of file
diff --git a/core/request_handler.go b/core/request_handler.go
index 8f793f5..3bec38d 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -31,17 +31,17 @@ var (
)
// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
+func (c *Connection) watchRequests(ch *api.Channel) {
for {
select {
case req, ok := <-ch.ReqChan:
// new request on the request channel
if !ok {
// after closing the request channel, release API channel and return
- c.releaseAPIChannel(ch, chMeta)
+ c.releaseAPIChannel(ch)
return
}
- c.processRequest(ch, chMeta, req)
+ c.processRequest(ch, req)
case req := <-ch.NotifSubsChan:
// new request on the notification subscribe channel
@@ -51,12 +51,12 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
}
// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
+func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error {
// check whether we are connected to VPP
if atomic.LoadUint32(&c.connected) == 0 {
err := ErrNotConnected
log.Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -67,8 +67,9 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
log.WithFields(logger.Fields{
"msg_name": req.Message.GetMessageName(),
"msg_crc": req.Message.GetCrcString(),
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -77,37 +78,35 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
if err != nil {
err = fmt.Errorf("unable to encode the messge: %v", err)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "channel": ch.ID,
"msg_id": msgID,
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "channel": ch.ID,
"msg_id": msgID,
"msg_size": len(data),
"msg_name": req.Message.GetMessageName(),
+ "seq_num": req.SeqNum,
}).Debug("Sending a message to VPP.")
}
- // send the message
- if req.Multipart {
- // expect multipart response
- atomic.StoreUint32(&chMeta.multipart, 1)
- }
-
// send the request to VPP
- err = c.vpp.SendMsg(chMeta.id, data)
+ context := packRequestContext(ch.ID, req.Multipart, req.SeqNum)
+ err = c.vpp.SendMsg(context, data)
if err != nil {
- err = fmt.Errorf("unable to send the messge: %v", err)
+ err = fmt.Errorf("unable to send the message: %v", err)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "context": context,
"msg_id": msgID,
+ "seq_num": req.SeqNum,
}).Error(err)
- sendReply(ch, &api.VppReply{Error: err})
+ sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
return err
}
@@ -116,12 +115,13 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
log.WithFields(logger.Fields{
- "context": chMeta.id,
+ "context": context,
"msg_id": c.pingReqID,
"msg_size": len(pingData),
+ "seq_num": req.SeqNum,
}).Debug("Sending a control ping to VPP.")
- c.vpp.SendMsg(chMeta.id, pingData)
+ c.vpp.SendMsg(context, pingData)
}
return nil
@@ -137,11 +137,14 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
return
}
+ chanID, isMultipart, seqNum := unpackRequestContext(context)
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- "msg_size": len(data),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ "channel_id": chanID,
+ "is_multipart": isMultipart,
+ "seq_num": seqNum,
}).Debug("Received a message from VPP.")
}
@@ -153,27 +156,27 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
// match ch according to the context
conn.channelsLock.RLock()
- ch, ok := conn.channels[context]
+ ch, ok := conn.channels[chanID]
conn.channelsLock.RUnlock()
if !ok {
log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- }).Error("Context ID not known, ignoring the message.")
+ "channel_id": chanID,
+ "msg_id": msgID,
+ }).Error("Channel ID not known, ignoring the message.")
return
}
- chMeta := ch.Metadata().(*channelMetadata)
lastReplyReceived := false
- // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
- if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
+ // if this is a control ping reply to a multipart request, treat this as a last part of the reply
+ if msgID == conn.pingReplyID && isMultipart {
lastReplyReceived = true
}
// send the data to the channel
sendReply(ch, &api.VppReply{
MessageID: msgID,
+ SeqNum: seqNum,
Data: data,
LastReplyReceived: lastReplyReceived,
})
@@ -195,6 +198,7 @@ func sendReply(ch *api.Channel, reply *api.VppReply) {
log.WithFields(logger.Fields{
"channel": ch,
"msg_id": reply.MessageID,
+ "seq_num": reply.SeqNum,
}).Warn("Unable to send the reply, reciever end not ready.")
}
}
@@ -254,3 +258,24 @@ func (c *Connection) LookupByID(ID uint16) (string, error) {
return "", fmt.Errorf("unknown message ID: %d", ID)
}
+
+// +------------------+-------------------+-----------------------+
+// | 15b = channel ID | 1b = is multipart | 16b = sequence number |
+// +------------------+-------------------+-----------------------+
+func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
+ context := uint32(chanID) << 17
+ if isMultipart {
+ context |= 1 << 16
+ }
+ context |= uint32(seqNum)
+ return context
+}
+
+func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
+ chanID = uint16(context >> 17)
+ if ((context >> 16) & 0x1) != 0 {
+ isMulipart = true
+ }
+ seqNum = uint16(context & 0xffff)
+ return
+}