diff options
Diffstat (limited to 'api')
-rw-r--r-- | api/api.go | 171 | ||||
-rw-r--r-- | api/api_test.go | 64 |
2 files changed, 145 insertions, 90 deletions
@@ -87,6 +87,8 @@ type MessageIdentifier interface { // methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently, // otherwise the responses could mix! Use multiple channels instead. type Channel struct { + ID uint16 // channel ID + ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider ReplyChan chan *VppReply // channel where VPP replies are delivered to @@ -96,13 +98,15 @@ type Channel struct { MsgDecoder MessageDecoder // used to decode binary data to generated API messages MsgIdentifier MessageIdentifier // used to retrieve message ID of a message + lastSeqNum uint16 // sequence number of the last sent request + + delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout - metadata interface{} // opaque metadata of the API channel - lastTimedOut bool // wether last request timed out } // VppRequest is a request that will be sent to VPP. type VppRequest struct { + SeqNum uint16 // sequence number Message Message // binary API message to be send to VPP Multipart bool // true if multipart response is expected, false otherwise } @@ -110,6 +114,7 @@ type VppRequest struct { // VppReply is a reply received from VPP. type VppReply struct { MessageID uint16 // ID of the message + SeqNum uint16 // sequence number Data []byte // encoded data with the message - MessageDecoder can be used for decoding LastReplyReceived bool // in case of multipart replies, true if the last reply has been already received and this one should be ignored Error error // in case of error, data is nil and this member contains error description @@ -130,30 +135,27 @@ type NotifSubscription struct { // RequestCtx is a context of a ongoing request (simple one - only one response is expected). type RequestCtx struct { ch *Channel + seqNum uint16 } // MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected). type MultiRequestCtx struct { ch *Channel + seqNum uint16 } const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout -// NewChannelInternal returns a new channel structure with metadata field filled in with the provided argument. +// NewChannelInternal returns a new channel structure. // Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly. // Use ChannelProvider to get an API channel ready for communication with VPP. -func NewChannelInternal(metadata interface{}) *Channel { +func NewChannelInternal(id uint16) *Channel { return &Channel{ + ID: id, replyTimeout: defaultReplyTimeout, - metadata: metadata, } } -// Metadata returns the metadata stored within the channel structure by the NewChannelInternal call. -func (ch *Channel) Metadata() interface{} { - return ch.metadata -} - // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply // from VPP before returning an error. func (ch *Channel) SetReplyTimeout(timeout time.Duration) { @@ -170,10 +172,12 @@ func (ch *Channel) Close() { // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). func (ch *Channel) SendRequest(msg Message) *RequestCtx { + ch.lastSeqNum++ ch.ReqChan <- &VppRequest{ Message: msg, + SeqNum: ch.lastSeqNum, } - return &RequestCtx{ch: ch} + return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum} } // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). @@ -183,7 +187,7 @@ func (req *RequestCtx) ReceiveReply(msg Message) error { return errors.New("invalid request context") } - lastReplyReceived, err := req.ch.receiveReplyInternal(msg) + lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) if lastReplyReceived { err = errors.New("multipart reply recieved while a simple reply expected") @@ -195,79 +199,56 @@ func (req *RequestCtx) ReceiveReply(msg Message) error { // Returns a multipart request context, that can be used to call ReceiveReply. // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx { + ch.lastSeqNum++ ch.ReqChan <- &VppRequest{ Message: msg, Multipart: true, + SeqNum: ch.lastSeqNum, } - return &MultiRequestCtx{ch: ch} + return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum} } // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). -// The reply will be decoded into the msg argument. If the last reply has been already consumed, LastReplyReceived is -// set to true. Do not use the message itself if LastReplyReceived is true - it won't be filled with actual data. +// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is +// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data. // Error will be returned if the response cannot be received or decoded. -func (req *MultiRequestCtx) ReceiveReply(msg Message) (LastReplyReceived bool, err error) { +func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) { if req == nil || req.ch == nil { return false, errors.New("invalid request context") } - return req.ch.receiveReplyInternal(msg) + return req.ch.receiveReplyInternal(msg, req.seqNum) } // receiveReplyInternal receives a reply from the reply channel into the provided msg structure. -func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, err error) { +func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) { + var ignore bool if msg == nil { return false, errors.New("nil message passed in") } + if ch.delayedReply != nil { + // try the delayed reply + vppReply := ch.delayedReply + ch.delayedReply = nil + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if !ignore { + return lastReplyReceived, err + } + } + timer := time.NewTimer(ch.replyTimeout) for { select { // blocks until a reply comes to ReplyChan or until timeout expires case vppReply := <-ch.ReplyChan: - if vppReply.Error != nil { - err = vppReply.Error - return - } - if vppReply.LastReplyReceived { - LastReplyReceived = true - return + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if ignore { + continue } - - // message checks - expMsgID, err := ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - return false, err - } - - if vppReply.MessageID != expMsgID { - var msgNameCrc string - if nameCrc, err := ch.MsgIdentifier.LookupByID(vppReply.MessageID); err != nil { - msgNameCrc = err.Error() - } else { - msgNameCrc = nameCrc - } - - if ch.lastTimedOut { - logrus.Warnf("received invalid message ID, expected %d (%s), but got %d (%s) (probably timed out reply from previous request)", - expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc) - continue - } - - err = fmt.Errorf("received invalid message ID, expected %d (%s), but got %d (%s) (check if multiple goroutines are not sharing single GoVPP channel)", - expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc) - return false, err - } - - ch.lastTimedOut = false - // decode the message - err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg) - return false, err + return lastReplyReceived, err case <-timer.C: - ch.lastTimedOut = true err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) return false, err } @@ -275,6 +256,80 @@ func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, er return } +func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) { + // check the sequence number + cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) + if cmpSeqNums == -1 { + // reply received too late, ignore the message + logrus.WithField("sequence-number", reply.SeqNum).Warn( + "Received reply to an already closed binary API request") + ignore = true + return + } + if cmpSeqNums == 1 { + ch.delayedReply = reply + err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum) + return + } + + if reply.Error != nil { + err = reply.Error + return + } + if reply.LastReplyReceived { + lastReplyReceived = true + return + } + + // message checks + var expMsgID uint16 + expMsgID, err = ch.MsgIdentifier.GetMessageID(msg) + if err != nil { + err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + return + } + + if reply.MessageID != expMsgID { + var msgNameCrc string + if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil { + msgNameCrc = err.Error() + } else { + msgNameCrc = nameCrc + } + + err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) " + + "(check if multiple goroutines are not sharing single GoVPP channel)", + reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) + return + } + + // decode the message + err = ch.MsgDecoder.DecodeMsg(reply.Data, msg) + return +} + +// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to, +// or succeeds seq. number <seqNum2>. +// Since sequence numbers cycle in the finite set of size 2^16, the function +// must assume that the distance between compared sequence numbers is less than +// (2^16)/2 to determine the order. +func compareSeqNumbers(seqNum1, seqNum2 uint16) int { + // calculate distance from seqNum1 to seqNum2 + var dist uint16 + if seqNum1 <= seqNum2 { + dist = seqNum2 - seqNum1 + } else { + dist = 0xffff - (seqNum1 - seqNum2 - 1) + } + if dist == 0 { + return 0 + } else if dist <= 0x8000 { + return -1 + } + return 1 +} + // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's // buffer is full, the notifications will not be delivered into it. diff --git a/api/api_test.go b/api/api_test.go index a439986..a83b1cc 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -64,7 +64,7 @@ func TestRequestReplyTapConnect(t *testing.T) { ctx.mockVpp.MockReply(&tap.TapConnectReply{ Retval: 10, SwIfIndex: 1, - }) + }, false) request := &tap.TapConnect{ TapName: []byte("test-tap-name"), UseRandomMac: 1, @@ -84,7 +84,7 @@ func TestRequestReplyTapModify(t *testing.T) { ctx.mockVpp.MockReply(&tap.TapModifyReply{ Retval: 15, SwIfIndex: 2, - }) + }, false) request := &tap.TapModify{ TapName: []byte("test-tap-modify"), UseRandomMac: 1, @@ -104,7 +104,7 @@ func TestRequestReplyTapDelete(t *testing.T) { ctx.mockVpp.MockReply(&tap.TapDeleteReply{ Retval: 20, - }) + }, false) request := &tap.TapDelete{ SwIfIndex: 3, } @@ -123,7 +123,7 @@ func TestRequestReplySwInterfaceTapDump(t *testing.T) { ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{ SwIfIndex: 25, DevName: byteName, - }) + }, false) request := &tap.SwInterfaceTapDump{} reply := &tap.SwInterfaceTapDetails{} @@ -140,7 +140,7 @@ func TestRequestReplyMemifCreate(t *testing.T) { ctx.mockVpp.MockReply(&memif.MemifCreateReply{ Retval: 22, SwIfIndex: 4, - }) + }, false) request := &memif.MemifCreate{ Role: 10, ID: 12, @@ -161,7 +161,7 @@ func TestRequestReplyMemifDelete(t *testing.T) { ctx.mockVpp.MockReply(&memif.MemifDeleteReply{ Retval: 24, - }) + }, false) request := &memif.MemifDelete{ SwIfIndex: 15, } @@ -180,7 +180,7 @@ func TestRequestReplyMemifDetails(t *testing.T) { SwIfIndex: 25, IfName: []byte("memif-name"), Role: 0, - }) + }, false) request := &memif.MemifDump{} reply := &memif.MemifDetails{} @@ -200,9 +200,9 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{ SwIfIndex: uint32(i), DevName: []byte("dev-name-test"), - }) + }, true) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) reqCtx := ctx.ch.SendMultiRequest(&tap.SwInterfaceTapDump{}) cnt := 0 @@ -226,9 +226,9 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { for i := 1; i <= 10; i++ { ctx.mockVpp.MockReply(&memif.MemifDetails{ SwIfIndex: uint32(i), - }) + }, true) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) reqCtx := ctx.ch.SendMultiRequest(&memif.MemifDump{}) cnt := 0 @@ -257,7 +257,7 @@ func TestNotifications(t *testing.T) { ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ SwIfIndex: 3, AdminUpDown: 1, - }) + }, false) ctx.mockVpp.SendMsg(0, []byte("")) // receive the notification @@ -292,7 +292,7 @@ func TestNotificationEvent(t *testing.T) { ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{ SwIfIndex: 2, LinkUpDown: 1, - }) + }, false) ctx.mockVpp.SendMsg(0, []byte("")) // receive the notification @@ -328,7 +328,7 @@ func TestSetReplyTimeout(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond) // first one request should work - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false) err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -348,9 +348,9 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) { ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), InterfaceName: []byte("if-name-test"), - }) + }, true) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) cnt := 0 sendMultiRequest := func() error { @@ -410,19 +410,19 @@ func TestMultiRequestDouble(t *testing.T) { // mock reply for i := 1; i <= 3; i++ { - ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ + ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), InterfaceName: []byte("if-name-test"), - }) + }, true, 1) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 1) for i := 1; i <= 3; i++ { - ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ + ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), InterfaceName: []byte("if-name-test"), - }) + }, true, 2) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true,2) cnt := 0 var sendMultiRequest = func() error { @@ -457,7 +457,7 @@ func TestReceiveReplyAfterTimeout(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond) // first one request should work - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,1) err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -466,10 +466,10 @@ func TestReceiveReplyAfterTimeout(t *testing.T) { Expect(err.Error()).To(ContainSubstring("timeout")) // simulating late reply - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,2) // normal reply for next request - ctx.mockVpp.MockReply(&tap.TapConnectReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false,3) req := &tap.TapConnect{ TapName: []byte("test-tap-name"), @@ -497,7 +497,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond * 100) // first one request should work - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false, 1) err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -525,15 +525,15 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { // simulating late replies for i := 1; i <= 3; i++ { - ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ + ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), InterfaceName: []byte("if-name-test"), - }) + }, true, 2) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 2) // normal reply for next request - ctx.mockVpp.MockReply(&tap.TapConnectReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false, 3) req := &tap.TapConnect{ TapName: []byte("test-tap-name"), @@ -551,12 +551,12 @@ func TestInvalidMessageID(t *testing.T) { defer ctx.teardownTest() // first one request should work - ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false) err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{}) Expect(err).ShouldNot(HaveOccurred()) // second should fail with error invalid message ID - ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false) err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("invalid message ID")) |