summaryrefslogtreecommitdiffstats
path: root/api
diff options
context:
space:
mode:
Diffstat (limited to 'api')
-rw-r--r--api/api.go171
-rw-r--r--api/api_test.go64
2 files changed, 145 insertions, 90 deletions
diff --git a/api/api.go b/api/api.go
index eafdf22..3c2c7ec 100644
--- a/api/api.go
+++ b/api/api.go
@@ -87,6 +87,8 @@ type MessageIdentifier interface {
// methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently,
// otherwise the responses could mix! Use multiple channels instead.
type Channel struct {
+ ID uint16 // channel ID
+
ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
ReplyChan chan *VppReply // channel where VPP replies are delivered to
@@ -96,13 +98,15 @@ type Channel struct {
MsgDecoder MessageDecoder // used to decode binary data to generated API messages
MsgIdentifier MessageIdentifier // used to retrieve message ID of a message
+ lastSeqNum uint16 // sequence number of the last sent request
+
+ delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery
replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
- metadata interface{} // opaque metadata of the API channel
- lastTimedOut bool // wether last request timed out
}
// VppRequest is a request that will be sent to VPP.
type VppRequest struct {
+ SeqNum uint16 // sequence number
Message Message // binary API message to be send to VPP
Multipart bool // true if multipart response is expected, false otherwise
}
@@ -110,6 +114,7 @@ type VppRequest struct {
// VppReply is a reply received from VPP.
type VppReply struct {
MessageID uint16 // ID of the message
+ SeqNum uint16 // sequence number
Data []byte // encoded data with the message - MessageDecoder can be used for decoding
LastReplyReceived bool // in case of multipart replies, true if the last reply has been already received and this one should be ignored
Error error // in case of error, data is nil and this member contains error description
@@ -130,30 +135,27 @@ type NotifSubscription struct {
// RequestCtx is a context of a ongoing request (simple one - only one response is expected).
type RequestCtx struct {
ch *Channel
+ seqNum uint16
}
// MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected).
type MultiRequestCtx struct {
ch *Channel
+ seqNum uint16
}
const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
-// NewChannelInternal returns a new channel structure with metadata field filled in with the provided argument.
+// NewChannelInternal returns a new channel structure.
// Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly.
// Use ChannelProvider to get an API channel ready for communication with VPP.
-func NewChannelInternal(metadata interface{}) *Channel {
+func NewChannelInternal(id uint16) *Channel {
return &Channel{
+ ID: id,
replyTimeout: defaultReplyTimeout,
- metadata: metadata,
}
}
-// Metadata returns the metadata stored within the channel structure by the NewChannelInternal call.
-func (ch *Channel) Metadata() interface{} {
- return ch.metadata
-}
-
// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
// from VPP before returning an error.
func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
@@ -170,10 +172,12 @@ func (ch *Channel) Close() {
// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
func (ch *Channel) SendRequest(msg Message) *RequestCtx {
+ ch.lastSeqNum++
ch.ReqChan <- &VppRequest{
Message: msg,
+ SeqNum: ch.lastSeqNum,
}
- return &RequestCtx{ch: ch}
+ return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum}
}
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
@@ -183,7 +187,7 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
return errors.New("invalid request context")
}
- lastReplyReceived, err := req.ch.receiveReplyInternal(msg)
+ lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
if lastReplyReceived {
err = errors.New("multipart reply recieved while a simple reply expected")
@@ -195,79 +199,56 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
// Returns a multipart request context, that can be used to call ReceiveReply.
// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx {
+ ch.lastSeqNum++
ch.ReqChan <- &VppRequest{
Message: msg,
Multipart: true,
+ SeqNum: ch.lastSeqNum,
}
- return &MultiRequestCtx{ch: ch}
+ return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum}
}
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
-// The reply will be decoded into the msg argument. If the last reply has been already consumed, LastReplyReceived is
-// set to true. Do not use the message itself if LastReplyReceived is true - it won't be filled with actual data.
+// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
+// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data.
// Error will be returned if the response cannot be received or decoded.
-func (req *MultiRequestCtx) ReceiveReply(msg Message) (LastReplyReceived bool, err error) {
+func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) {
if req == nil || req.ch == nil {
return false, errors.New("invalid request context")
}
- return req.ch.receiveReplyInternal(msg)
+ return req.ch.receiveReplyInternal(msg, req.seqNum)
}
// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
-func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, err error) {
+func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
+ var ignore bool
if msg == nil {
return false, errors.New("nil message passed in")
}
+ if ch.delayedReply != nil {
+ // try the delayed reply
+ vppReply := ch.delayedReply
+ ch.delayedReply = nil
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if !ignore {
+ return lastReplyReceived, err
+ }
+ }
+
timer := time.NewTimer(ch.replyTimeout)
for {
select {
// blocks until a reply comes to ReplyChan or until timeout expires
case vppReply := <-ch.ReplyChan:
- if vppReply.Error != nil {
- err = vppReply.Error
- return
- }
- if vppReply.LastReplyReceived {
- LastReplyReceived = true
- return
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if ignore {
+ continue
}
-
- // message checks
- expMsgID, err := ch.MsgIdentifier.GetMessageID(msg)
- if err != nil {
- err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
- msg.GetMessageName(), msg.GetCrcString())
- return false, err
- }
-
- if vppReply.MessageID != expMsgID {
- var msgNameCrc string
- if nameCrc, err := ch.MsgIdentifier.LookupByID(vppReply.MessageID); err != nil {
- msgNameCrc = err.Error()
- } else {
- msgNameCrc = nameCrc
- }
-
- if ch.lastTimedOut {
- logrus.Warnf("received invalid message ID, expected %d (%s), but got %d (%s) (probably timed out reply from previous request)",
- expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc)
- continue
- }
-
- err = fmt.Errorf("received invalid message ID, expected %d (%s), but got %d (%s) (check if multiple goroutines are not sharing single GoVPP channel)",
- expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc)
- return false, err
- }
-
- ch.lastTimedOut = false
- // decode the message
- err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg)
- return false, err
+ return lastReplyReceived, err
case <-timer.C:
- ch.lastTimedOut = true
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
return false, err
}
@@ -275,6 +256,80 @@ func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, er
return
}
+func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) {
+ // check the sequence number
+ cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
+ if cmpSeqNums == -1 {
+ // reply received too late, ignore the message
+ logrus.WithField("sequence-number", reply.SeqNum).Warn(
+ "Received reply to an already closed binary API request")
+ ignore = true
+ return
+ }
+ if cmpSeqNums == 1 {
+ ch.delayedReply = reply
+ err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
+ return
+ }
+
+ if reply.Error != nil {
+ err = reply.Error
+ return
+ }
+ if reply.LastReplyReceived {
+ lastReplyReceived = true
+ return
+ }
+
+ // message checks
+ var expMsgID uint16
+ expMsgID, err = ch.MsgIdentifier.GetMessageID(msg)
+ if err != nil {
+ err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
+ msg.GetMessageName(), msg.GetCrcString())
+ return
+ }
+
+ if reply.MessageID != expMsgID {
+ var msgNameCrc string
+ if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil {
+ msgNameCrc = err.Error()
+ } else {
+ msgNameCrc = nameCrc
+ }
+
+ err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) " +
+ "(check if multiple goroutines are not sharing single GoVPP channel)",
+ reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
+ return
+ }
+
+ // decode the message
+ err = ch.MsgDecoder.DecodeMsg(reply.Data, msg)
+ return
+}
+
+// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
+// or succeeds seq. number <seqNum2>.
+// Since sequence numbers cycle in the finite set of size 2^16, the function
+// must assume that the distance between compared sequence numbers is less than
+// (2^16)/2 to determine the order.
+func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
+ // calculate distance from seqNum1 to seqNum2
+ var dist uint16
+ if seqNum1 <= seqNum2 {
+ dist = seqNum2 - seqNum1
+ } else {
+ dist = 0xffff - (seqNum1 - seqNum2 - 1)
+ }
+ if dist == 0 {
+ return 0
+ } else if dist <= 0x8000 {
+ return -1
+ }
+ return 1
+}
+
// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
// buffer is full, the notifications will not be delivered into it.
diff --git a/api/api_test.go b/api/api_test.go
index a439986..a83b1cc 100644
--- a/api/api_test.go
+++ b/api/api_test.go
@@ -64,7 +64,7 @@ func TestRequestReplyTapConnect(t *testing.T) {
ctx.mockVpp.MockReply(&tap.TapConnectReply{
Retval: 10,
SwIfIndex: 1,
- })
+ }, false)
request := &tap.TapConnect{
TapName: []byte("test-tap-name"),
UseRandomMac: 1,
@@ -84,7 +84,7 @@ func TestRequestReplyTapModify(t *testing.T) {
ctx.mockVpp.MockReply(&tap.TapModifyReply{
Retval: 15,
SwIfIndex: 2,
- })
+ }, false)
request := &tap.TapModify{
TapName: []byte("test-tap-modify"),
UseRandomMac: 1,
@@ -104,7 +104,7 @@ func TestRequestReplyTapDelete(t *testing.T) {
ctx.mockVpp.MockReply(&tap.TapDeleteReply{
Retval: 20,
- })
+ }, false)
request := &tap.TapDelete{
SwIfIndex: 3,
}
@@ -123,7 +123,7 @@ func TestRequestReplySwInterfaceTapDump(t *testing.T) {
ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{
SwIfIndex: 25,
DevName: byteName,
- })
+ }, false)
request := &tap.SwInterfaceTapDump{}
reply := &tap.SwInterfaceTapDetails{}
@@ -140,7 +140,7 @@ func TestRequestReplyMemifCreate(t *testing.T) {
ctx.mockVpp.MockReply(&memif.MemifCreateReply{
Retval: 22,
SwIfIndex: 4,
- })
+ }, false)
request := &memif.MemifCreate{
Role: 10,
ID: 12,
@@ -161,7 +161,7 @@ func TestRequestReplyMemifDelete(t *testing.T) {
ctx.mockVpp.MockReply(&memif.MemifDeleteReply{
Retval: 24,
- })
+ }, false)
request := &memif.MemifDelete{
SwIfIndex: 15,
}
@@ -180,7 +180,7 @@ func TestRequestReplyMemifDetails(t *testing.T) {
SwIfIndex: 25,
IfName: []byte("memif-name"),
Role: 0,
- })
+ }, false)
request := &memif.MemifDump{}
reply := &memif.MemifDetails{}
@@ -200,9 +200,9 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) {
ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{
SwIfIndex: uint32(i),
DevName: []byte("dev-name-test"),
- })
+ }, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
reqCtx := ctx.ch.SendMultiRequest(&tap.SwInterfaceTapDump{})
cnt := 0
@@ -226,9 +226,9 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) {
for i := 1; i <= 10; i++ {
ctx.mockVpp.MockReply(&memif.MemifDetails{
SwIfIndex: uint32(i),
- })
+ }, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
reqCtx := ctx.ch.SendMultiRequest(&memif.MemifDump{})
cnt := 0
@@ -257,7 +257,7 @@ func TestNotifications(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
SwIfIndex: 3,
AdminUpDown: 1,
- })
+ }, false)
ctx.mockVpp.SendMsg(0, []byte(""))
// receive the notification
@@ -292,7 +292,7 @@ func TestNotificationEvent(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{
SwIfIndex: 2,
LinkUpDown: 1,
- })
+ }, false)
ctx.mockVpp.SendMsg(0, []byte(""))
// receive the notification
@@ -328,7 +328,7 @@ func TestSetReplyTimeout(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond)
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false)
err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -348,9 +348,9 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) {
ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true)
cnt := 0
sendMultiRequest := func() error {
@@ -410,19 +410,19 @@ func TestMultiRequestDouble(t *testing.T) {
// mock reply
for i := 1; i <= 3; i++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
+ ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true, 1)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 1)
for i := 1; i <= 3; i++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
+ ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true, 2)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true,2)
cnt := 0
var sendMultiRequest = func() error {
@@ -457,7 +457,7 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond)
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,1)
err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -466,10 +466,10 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
Expect(err.Error()).To(ContainSubstring("timeout"))
// simulating late reply
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,2)
// normal reply for next request
- ctx.mockVpp.MockReply(&tap.TapConnectReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false,3)
req := &tap.TapConnect{
TapName: []byte("test-tap-name"),
@@ -497,7 +497,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond * 100)
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false, 1)
err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -525,15 +525,15 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
// simulating late replies
for i := 1; i <= 3; i++ {
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{
+ ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{
SwIfIndex: uint32(i),
InterfaceName: []byte("if-name-test"),
- })
+ }, true, 2)
}
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 2)
// normal reply for next request
- ctx.mockVpp.MockReply(&tap.TapConnectReply{})
+ ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false, 3)
req := &tap.TapConnect{
TapName: []byte("test-tap-name"),
@@ -551,12 +551,12 @@ func TestInvalidMessageID(t *testing.T) {
defer ctx.teardownTest()
// first one request should work
- ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+ ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false)
err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{})
Expect(err).ShouldNot(HaveOccurred())
// second should fail with error invalid message ID
- ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+ ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false)
err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
Expect(err).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("invalid message ID"))