diff options
-rw-r--r-- | adapter/vppapiclient/vppapiclient_adapter.go | 3 | ||||
-rw-r--r-- | api/api.go | 67 | ||||
-rw-r--r-- | api/api_test.go | 103 |
3 files changed, 142 insertions, 31 deletions
diff --git a/adapter/vppapiclient/vppapiclient_adapter.go b/adapter/vppapiclient/vppapiclient_adapter.go index 1cdaf72..10ec53b 100644 --- a/adapter/vppapiclient/vppapiclient_adapter.go +++ b/adapter/vppapiclient/vppapiclient_adapter.go @@ -77,7 +77,6 @@ govpp_get_msg_index(char *name_and_crc) import "C" import ( - "errors" "fmt" "os" "reflect" @@ -130,7 +129,7 @@ func (a *vppAPIClientAdapter) GetMsgID(msgName string, msgCrc string) (uint16, e msgID := uint16(C.govpp_get_msg_index(nameAndCrc)) if msgID == ^uint16(0) { - return msgID, errors.New("unkonwn message") + return msgID, fmt.Errorf("unknown message: %v (crc: %v)", msgName, msgCrc) } return msgID, nil @@ -18,6 +18,8 @@ import ( "errors" "fmt" "time" + + "github.com/sirupsen/logrus" ) // MessageType represents the type of a VPP message. @@ -61,7 +63,7 @@ type ChannelProvider interface { // It uses default buffer sizes for the request and reply Go channels. NewAPIChannel() (*Channel, error) - // NewAPIChannel returns a new channel for communication with VPP via govpp core. + // NewAPIChannelBuffered returns a new channel for communication with VPP via govpp core. // It allows to specify custom buffer sizes for the request and reply Go channels. NewAPIChannelBuffered() (*Channel, error) } @@ -94,6 +96,7 @@ type Channel struct { replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout metadata interface{} // opaque metadata of the API channel + lastTimedOut bool // wether last request timed out } // VppRequest is a request that will be sent to VPP. @@ -214,34 +217,46 @@ func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, er if msg == nil { return false, errors.New("nil message passed in") } - select { - // blocks until a reply comes to ReplyChan or until timeout expires - case vppReply := <-ch.ReplyChan: - if vppReply.Error != nil { - err = vppReply.Error - return - } - if vppReply.LastReplyReceived { - LastReplyReceived = true - return - } - // message checks - expMsgID, err := ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) + + timer := time.NewTimer(ch.replyTimeout) + for { + select { + // blocks until a reply comes to ReplyChan or until timeout expires + case vppReply := <-ch.ReplyChan: + if vppReply.Error != nil { + err = vppReply.Error + return + } + if vppReply.LastReplyReceived { + LastReplyReceived = true + return + } + // message checks + expMsgID, err := ch.MsgIdentifier.GetMessageID(msg) + if err != nil { + err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + return false, err + } + if vppReply.MessageID != expMsgID { + if ch.lastTimedOut { + logrus.Warnf("received invalid message ID, expected %d (%s), but got %d (probably timed out reply from previous request)", + expMsgID, msg.GetMessageName(), vppReply.MessageID) + continue + } + err = fmt.Errorf("received invalid message ID, expected %d (%s), but got %d (check if multiple goroutines are not sharing single GoVPP channel)", + expMsgID, msg.GetMessageName(), vppReply.MessageID) + return false, err + } + ch.lastTimedOut = false + // decode the message + err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg) return false, err - } - if vppReply.MessageID != expMsgID { - err = fmt.Errorf("received invalid message ID, expected %d (%s), but got %d (check if multiple goroutines are not sharing single GoVPP channel)", - expMsgID, msg.GetMessageName(), vppReply.MessageID) + case <-timer.C: + ch.lastTimedOut = true + err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) return false, err } - // decode the message - err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg) - - case <-time.After(ch.replyTimeout): - err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) } return } diff --git a/api/api_test.go b/api/api_test.go index 9af6e71..62541ab 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -49,8 +49,6 @@ func setupTest(t *testing.T) *testCtx { ctx.ch, err = ctx.conn.NewAPIChannel() Expect(err).ShouldNot(HaveOccurred()) - ctx.ch.SetReplyTimeout(time.Millisecond) - return ctx } @@ -328,6 +326,8 @@ func TestSetReplyTimeout(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + ctx.ch.SetReplyTimeout(time.Millisecond) + // first one request should work ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) @@ -343,6 +343,8 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + ctx.ch.SetReplyTimeout(time.Millisecond) + for i := 1; i <= 3; i++ { ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), @@ -424,7 +426,7 @@ func TestMultiRequestDouble(t *testing.T) { ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) cnt := 0 - sendMultiRequest := func() error { + var sendMultiRequest = func() error { reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) for { msg := &interfaces.SwInterfaceDetails{} @@ -448,3 +450,98 @@ func TestMultiRequestDouble(t *testing.T) { Expect(cnt).To(BeEquivalentTo(6)) } + +func TestReceiveReplyAfterTimeout(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond) + + // first one request should work + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) + + // simulating late reply + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + // normal reply for next request + ctx.mockVpp.MockReply(&tap.TapConnectReply{}) + + req := &tap.TapConnect{ + TapName: []byte("test-tap-name"), + UseRandomMac: 1, + } + reply := &tap.TapConnectReply{} + + // should succeed + err = ctx.ch.SendRequest(req).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) +} + +/* + TODO: fix mock adapter + This test will fail because mock adapter will stop sending replies + when it encounters control_ping_reply from multi request, + thus never sending reply for next request + +func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond * 100) + + // first one request should work + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + cnt := 0 + var sendMultiRequest = func() error { + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + for { + msg := &interfaces.SwInterfaceDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + if err != nil { + return err + } + cnt++ + } + return nil + } + + err = sendMultiRequest() + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) + Expect(cnt).To(BeEquivalentTo(0)) + + // simulating late replies + for i := 1; i <= 3; i++ { + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ + SwIfIndex: uint32(i), + InterfaceName: []byte("if-name-test"), + }) + } + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + // normal reply for next request + ctx.mockVpp.MockReply(&tap.TapConnectReply{}) + + req := &tap.TapConnect{ + TapName: []byte("test-tap-name"), + UseRandomMac: 1, + } + reply := &tap.TapConnectReply{} + + // should succeed + err = ctx.ch.SendRequest(req).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) +} +*/ |