From f1bef4a3c66f4408afdeb64cda62ccd8562d0fc6 Mon Sep 17 00:00:00 2001 From: Vladimir Lavor Date: Tue, 3 Jul 2018 10:39:21 +0200 Subject: make api.Channel as interface Change-Id: I052d241ab09043b1195beebeee99df4d8536621f Signed-off-by: Vladimir Lavor --- Makefile | 1 - adapter/mock/mock_adapter.go | 27 +- api/api.go | 314 +++------------ api/api_test.go | 588 ---------------------------- api/doc.go | 94 +---- codec/doc.go | 2 + codec/msg_codec.go | 141 +++++++ core/channel.go | 276 +++++++++++++ core/channel_test.go | 587 +++++++++++++++++++++++++++ core/connection.go | 403 +++++++++++++++++++ core/connection_test.go | 543 +++++++++++++++++++++++++ core/core.go | 400 ------------------- core/core_test.go | 542 ------------------------- core/doc.go | 91 +++++ core/msg_codec.go | 159 -------- core/notification_handler.go | 7 +- core/request_handler.go | 18 +- examples/cmd/perf-bench/perf-bench.go | 16 +- examples/cmd/simple-client/simple_client.go | 22 +- examples/cmd/stats-client/stats_client.go | 6 +- 20 files changed, 2144 insertions(+), 2093 deletions(-) delete mode 100644 api/api_test.go create mode 100644 codec/doc.go create mode 100644 codec/msg_codec.go create mode 100644 core/channel.go create mode 100644 core/channel_test.go create mode 100644 core/connection.go create mode 100644 core/connection_test.go delete mode 100644 core/core.go delete mode 100644 core/core_test.go delete mode 100644 core/msg_codec.go diff --git a/Makefile b/Makefile index 4eed58d..cfc99f7 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,6 @@ build: test: @cd cmd/binapi-generator && go test -cover . - @cd api && go test -cover ./... @cd core && go test -cover . install: diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go index 959fd86..a5cb62d 100644 --- a/adapter/mock/mock_adapter.go +++ b/adapter/mock/mock_adapter.go @@ -25,8 +25,8 @@ import ( "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/adapter/mock/binapi" "git.fd.io/govpp.git/api" - "git.fd.io/govpp.git/core" + "git.fd.io/govpp.git/codec" "github.com/lunixbochs/struc" ) @@ -48,10 +48,10 @@ type VppAdapter struct { binAPITypes map[string]reflect.Type access sync.RWMutex - replies []reply // FIFO queue of messages - replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses - repliesLock sync.Mutex // mutex for the queue - mode replyMode // mode in which the mock operates + replies []reply // FIFO queue of messages + replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses + repliesLock sync.Mutex // mutex for the queue + mode replyMode // mode in which the mock operates } // defaultReply is a default reply message that mock adapter returns for a request. @@ -79,7 +79,7 @@ type MsgWithContext struct { Multipart bool /* set by mock adapter */ - hasCtx bool + hasCtx bool } // ReplyHandler is a type that allows to extend the behaviour of VPP mock. @@ -178,7 +178,7 @@ func (a *VppAdapter) ReplyBytes(request MessageDTO, reply api.Message) ([]byte, log.Println("ReplyBytes ", replyMsgID, " ", reply.GetMessageName(), " clientId: ", request.ClientID) buf := new(bytes.Buffer) - struc.Pack(buf, &core.VppReplyHeader{VlMsgID: replyMsgID, Context: request.ClientID}) + struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: replyMsgID, Context: request.ClientID}) struc.Pack(buf, reply) return buf.Bytes(), nil @@ -238,7 +238,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { replyHandler := a.replyHandlers[i] buf := bytes.NewReader(data) - reqHeader := core.VppRequestHeader{} + reqHeader := codec.VppRequestHeader{} struc.Unpack(buf, &reqHeader) a.access.Lock() @@ -273,13 +273,13 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { context = setSeqNum(context, msg.SeqNum) } if msg.Msg.GetMessageType() == api.ReplyMessage { - struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: context}) + struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: msgID, Context: context}) } else if msg.Msg.GetMessageType() == api.EventMessage { - struc.Pack(buf, &core.VppEventHeader{VlMsgID: msgID, Context: context}) + struc.Pack(buf, &codec.VppEventHeader{VlMsgID: msgID, Context: context}) } else if msg.Msg.GetMessageType() == api.RequestMessage { - struc.Pack(buf, &core.VppRequestHeader{VlMsgID: msgID, Context: context}) + struc.Pack(buf, &codec.VppRequestHeader{VlMsgID: msgID, Context: context}) } else { - struc.Pack(buf, &core.VppOtherHeader{VlMsgID: msgID}) + struc.Pack(buf, &codec.VppOtherHeader{VlMsgID: msgID}) } struc.Pack(buf, msg.Msg) a.callback(context, msgID, buf.Bytes()) @@ -299,7 +299,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { // return default reply buf := new(bytes.Buffer) msgID := uint16(defaultReplyMsgID) - struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: clientID}) + struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: msgID, Context: clientID}) struc.Pack(buf, &defaultReply{}) a.callback(clientID, msgID, buf.Bytes()) } @@ -392,4 +392,3 @@ func setMultipart(context uint32, isMultipart bool) (newContext uint32) { } return context } - diff --git a/api/api.go b/api/api.go index 34e17c1..9c68ab9 100644 --- a/api/api.go +++ b/api/api.go @@ -15,11 +15,7 @@ package api import ( - "errors" - "fmt" "time" - - "github.com/sirupsen/logrus" ) // MessageType represents the type of a VPP message. @@ -61,11 +57,11 @@ type DataType interface { type ChannelProvider interface { // NewAPIChannel returns a new channel for communication with VPP via govpp core. // It uses default buffer sizes for the request and reply Go channels. - NewAPIChannel() (*Channel, error) + NewAPIChannel() (Channel, error) // NewAPIChannelBuffered returns a new channel for communication with VPP via govpp core. // It allows to specify custom buffer sizes for the request and reply Go channels. - NewAPIChannelBuffered() (*Channel, error) + NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (Channel, error) } // MessageDecoder provides functionality for decoding binary data to generated API messages. @@ -82,26 +78,57 @@ type MessageIdentifier interface { LookupByID(ID uint16) (string, error) } -// Channel is the main communication interface with govpp core. It contains two Go channels, one for sending the requests -// to VPP and one for receiving the replies from it. The user can access the Go channels directly, or use the helper -// methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently, -// otherwise the responses could mix! Use multiple channels instead. -type Channel struct { - ID uint16 // channel ID - - ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider - ReplyChan chan *VppReply // channel where VPP replies are delivered to - - NotifSubsChan chan *NotifSubscribeRequest // channel for sending notification subscribe requests - NotifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to - - MsgDecoder MessageDecoder // used to decode binary data to generated API messages - MsgIdentifier MessageIdentifier // used to retrieve message ID of a message - - lastSeqNum uint16 // sequence number of the last sent request - - delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery - replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout +// Channel provides methods for direct communication with VPP channel. +type Channel interface { + // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. + // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). + SendRequest(msg Message) RequestCtx + // SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP. + // Returns a multipart request context, that can be used to call ReceiveReply. + // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). + SendMultiRequest(msg Message) MultiRequestCtx + // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. + // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's + // buffer is full, the notifications will not be delivered into it. + SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error) + // UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription. + UnsubscribeNotification(subscription *NotifSubscription) error + // CheckMessageCompatibility checks whether provided messages are compatible with the version of VPP + // which the library is connected to. + CheckMessageCompatibility(messages ...Message) error + // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply + // from VPP before returning an error. + SetReplyTimeout(timeout time.Duration) + // GetRequestChannel returns request go channel of the VPP channel + GetRequestChannel() chan<- *VppRequest + // GetReplyChannel returns reply go channel of the VPP channel + GetReplyChannel() <-chan *VppReply + // GetNotificationChannel returns notification go channel of the VPP channel + GetNotificationChannel() chan<- *NotifSubscribeRequest + // GetNotificationReplyChannel returns notification reply go channel of the VPP channel + GetNotificationReplyChannel() <-chan error + // GetMessageDecoder returns message decoder instance + GetMessageDecoder() MessageDecoder + // GetID returns channel's ID + GetID() uint16 + // Close closes the API channel and releases all API channel-related resources in the ChannelProvider. + Close() +} + +// RequestCtx is helper interface which allows to receive reply on request context data +type RequestCtx interface { + // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). + // The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded. + ReceiveReply(msg Message) error +} + +// MultiRequestCtx is helper interface which allows to receive reply on multi-request context data +type MultiRequestCtx interface { + // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). + // The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is + // set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data. + // Error will be returned if the response cannot be received or decoded. + ReceiveReply(msg Message) (lastReplyReceived bool, err error) } // VppRequest is a request that will be sent to VPP. @@ -131,238 +158,3 @@ type NotifSubscription struct { NotifChan chan Message // channel where notification messages will be delivered to MsgFactory func() Message // function that returns a new instance of the specific message that is expected as a notification } - -// RequestCtx is a context of a ongoing request (simple one - only one response is expected). -type RequestCtx struct { - ch *Channel - seqNum uint16 -} - -// MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected). -type MultiRequestCtx struct { - ch *Channel - seqNum uint16 -} - -const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout - -// NewChannelInternal returns a new channel structure. -// Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly. -// Use ChannelProvider to get an API channel ready for communication with VPP. -func NewChannelInternal(id uint16) *Channel { - return &Channel{ - ID: id, - replyTimeout: defaultReplyTimeout, - } -} - -// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply -// from VPP before returning an error. -func (ch *Channel) SetReplyTimeout(timeout time.Duration) { - ch.replyTimeout = timeout -} - -// Close closes the API channel and releases all API channel-related resources in the ChannelProvider. -func (ch *Channel) Close() { - if ch.ReqChan != nil { - close(ch.ReqChan) - } -} - -// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. -// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). -func (ch *Channel) SendRequest(msg Message) *RequestCtx { - ch.lastSeqNum++ - ch.ReqChan <- &VppRequest{ - Message: msg, - SeqNum: ch.lastSeqNum, - } - return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum} -} - -// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). -// The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded. -func (req *RequestCtx) ReceiveReply(msg Message) error { - if req == nil || req.ch == nil { - return errors.New("invalid request context") - } - - lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) - - if lastReplyReceived { - err = errors.New("multipart reply recieved while a simple reply expected") - } - return err -} - -// SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP. -// Returns a multipart request context, that can be used to call ReceiveReply. -// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). -func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx { - ch.lastSeqNum++ - ch.ReqChan <- &VppRequest{ - Message: msg, - Multipart: true, - SeqNum: ch.lastSeqNum, - } - return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum} -} - -// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). -// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is -// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data. -// Error will be returned if the response cannot be received or decoded. -func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) { - if req == nil || req.ch == nil { - return false, errors.New("invalid request context") - } - - return req.ch.receiveReplyInternal(msg, req.seqNum) -} - -// receiveReplyInternal receives a reply from the reply channel into the provided msg structure. -func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) { - var ignore bool - if msg == nil { - return false, errors.New("nil message passed in") - } - - if ch.delayedReply != nil { - // try the delayed reply - vppReply := ch.delayedReply - ch.delayedReply = nil - ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) - if !ignore { - return lastReplyReceived, err - } - } - - timer := time.NewTimer(ch.replyTimeout) - for { - select { - // blocks until a reply comes to ReplyChan or until timeout expires - case vppReply := <-ch.ReplyChan: - ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) - if ignore { - continue - } - return lastReplyReceived, err - - case <-timer.C: - err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) - return false, err - } - } - return -} - -func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) { - // check the sequence number - cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) - if cmpSeqNums == -1 { - // reply received too late, ignore the message - logrus.WithField("sequence-number", reply.SeqNum).Warn( - "Received reply to an already closed binary API request") - ignore = true - return - } - if cmpSeqNums == 1 { - ch.delayedReply = reply - err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum) - return - } - - if reply.Error != nil { - err = reply.Error - return - } - if reply.LastReplyReceived { - lastReplyReceived = true - return - } - - // message checks - var expMsgID uint16 - expMsgID, err = ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - return - } - - if reply.MessageID != expMsgID { - var msgNameCrc string - if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil { - msgNameCrc = err.Error() - } else { - msgNameCrc = nameCrc - } - - err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+ - "(check if multiple goroutines are not sharing single GoVPP channel)", - reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) - return - } - - // decode the message - err = ch.MsgDecoder.DecodeMsg(reply.Data, msg) - return -} - -// compareSeqNumbers returns -1, 0, 1 if sequence number precedes, equals to, -// or succeeds seq. number . -// Since sequence numbers cycle in the finite set of size 2^16, the function -// must assume that the distance between compared sequence numbers is less than -// (2^16)/2 to determine the order. -func compareSeqNumbers(seqNum1, seqNum2 uint16) int { - // calculate distance from seqNum1 to seqNum2 - var dist uint16 - if seqNum1 <= seqNum2 { - dist = seqNum2 - seqNum1 - } else { - dist = 0xffff - (seqNum1 - seqNum2 - 1) - } - if dist == 0 { - return 0 - } else if dist <= 0x8000 { - return -1 - } - return 1 -} - -// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. -// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's -// buffer is full, the notifications will not be delivered into it. -func (ch *Channel) SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error) { - subscription := &NotifSubscription{ - NotifChan: notifChan, - MsgFactory: msgFactory, - } - ch.NotifSubsChan <- &NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: true, - } - return subscription, <-ch.NotifSubsReplyChan -} - -// UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription. -func (ch *Channel) UnsubscribeNotification(subscription *NotifSubscription) error { - ch.NotifSubsChan <- &NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: false, - } - return <-ch.NotifSubsReplyChan -} - -// CheckMessageCompatibility checks whether provided messages are compatible with the version of VPP -// which the library is connected to. -func (ch *Channel) CheckMessageCompatibility(messages ...Message) error { - for _, msg := range messages { - _, err := ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - } - } - return nil -} diff --git a/api/api_test.go b/api/api_test.go deleted file mode 100644 index 7cbd9f0..0000000 --- a/api/api_test.go +++ /dev/null @@ -1,588 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package api_test - -import ( - "testing" - "time" - - "git.fd.io/govpp.git/adapter/mock" - "git.fd.io/govpp.git/api" - "git.fd.io/govpp.git/core" - "git.fd.io/govpp.git/core/bin_api/vpe" - "git.fd.io/govpp.git/examples/bin_api/interfaces" - "git.fd.io/govpp.git/examples/bin_api/memif" - "git.fd.io/govpp.git/examples/bin_api/tap" - - . "github.com/onsi/gomega" -) - -type testCtx struct { - mockVpp *mock.VppAdapter - conn *core.Connection - ch *api.Channel -} - -func setupTest(t *testing.T) *testCtx { - RegisterTestingT(t) - - ctx := &testCtx{ - mockVpp: &mock.VppAdapter{}, - } - - var err error - ctx.conn, err = core.Connect(ctx.mockVpp) - Expect(err).ShouldNot(HaveOccurred()) - - ctx.ch, err = ctx.conn.NewAPIChannel() - Expect(err).ShouldNot(HaveOccurred()) - - return ctx -} - -func (ctx *testCtx) teardownTest() { - ctx.ch.Close() - ctx.conn.Disconnect() -} - -func TestRequestReplyTapConnect(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.mockVpp.MockReply(&tap.TapConnectReply{ - Retval: 10, - SwIfIndex: 1, - }) - request := &tap.TapConnect{ - TapName: []byte("test-tap-name"), - UseRandomMac: 1, - } - reply := &tap.TapConnectReply{} - - err := ctx.ch.SendRequest(request).ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(10), "Incorrect retval value for TapConnectReply") - Expect(reply.SwIfIndex).To(BeEquivalentTo(1), "Incorrect SwIfIndex value for TapConnectReply") -} - -func TestRequestReplyTapModify(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.mockVpp.MockReply(&tap.TapModifyReply{ - Retval: 15, - SwIfIndex: 2, - }) - request := &tap.TapModify{ - TapName: []byte("test-tap-modify"), - UseRandomMac: 1, - CustomDevInstance: 1, - } - reply := &tap.TapModifyReply{} - - err := ctx.ch.SendRequest(request).ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(15), "Incorrect retval value for TapModifyReply") - Expect(reply.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for TapModifyReply") -} - -func TestRequestReplyTapDelete(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.mockVpp.MockReply(&tap.TapDeleteReply{ - Retval: 20, - }) - request := &tap.TapDelete{ - SwIfIndex: 3, - } - reply := &tap.TapDeleteReply{} - - err := ctx.ch.SendRequest(request).ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(20), "Incorrect retval value for TapDeleteReply") -} - -func TestRequestReplySwInterfaceTapDump(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - byteName := []byte("dev-name-test") - ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{ - SwIfIndex: 25, - DevName: byteName, - }) - request := &tap.SwInterfaceTapDump{} - reply := &tap.SwInterfaceTapDetails{} - - err := ctx.ch.SendRequest(request).ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for SwInterfaceTapDetails") - Expect(reply.DevName).ToNot(BeNil(), "Incorrect DevName value for SwInterfaceTapDetails") -} - -func TestRequestReplyMemifCreate(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.mockVpp.MockReply(&memif.MemifCreateReply{ - Retval: 22, - SwIfIndex: 4, - }) - request := &memif.MemifCreate{ - Role: 10, - ID: 12, - RingSize: 8000, - BufferSize: 50, - } - reply := &memif.MemifCreateReply{} - - err := ctx.ch.SendRequest(request).ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(22), "Incorrect Retval value for MemifCreate") - Expect(reply.SwIfIndex).To(BeEquivalentTo(4), "Incorrect SwIfIndex value for MemifCreate") -} - -func TestRequestReplyMemifDelete(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.mockVpp.MockReply(&memif.MemifDeleteReply{ - Retval: 24, - }) - request := &memif.MemifDelete{ - SwIfIndex: 15, - } - reply := &memif.MemifDeleteReply{} - - err := ctx.ch.SendRequest(request).ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(24), "Incorrect Retval value for MemifDelete") -} - -func TestRequestReplyMemifDetails(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.mockVpp.MockReply(&memif.MemifDetails{ - SwIfIndex: 25, - IfName: []byte("memif-name"), - Role: 0, - }) - request := &memif.MemifDump{} - reply := &memif.MemifDetails{} - - err := ctx.ch.SendRequest(request).ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for MemifDetails") - Expect(reply.IfName).ToNot(BeEmpty(), "MemifDetails IfName is empty byte array") - Expect(reply.Role).To(BeEquivalentTo(0), "Incorrect Role value for MemifDetails") -} - -func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - // mock reply - msgs := []api.Message{} - for i := 1; i <= 10; i++ { - msgs = append(msgs, &tap.SwInterfaceTapDetails{ - SwIfIndex: uint32(i), - DevName: []byte("dev-name-test"), - }) - } - ctx.mockVpp.MockReply(msgs...) - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - - reqCtx := ctx.ch.SendMultiRequest(&tap.SwInterfaceTapDump{}) - cnt := 0 - for { - msg := &tap.SwInterfaceTapDetails{} - stop, err := reqCtx.ReceiveReply(msg) - if stop { - break // break out of the loop - } - Expect(err).ShouldNot(HaveOccurred()) - cnt++ - } - Expect(cnt).To(BeEquivalentTo(10)) -} - -func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - // mock reply - msgs := []api.Message{} - for i := 1; i <= 10; i++ { - msgs = append(msgs, &memif.MemifDetails{ - SwIfIndex: uint32(i), - }) - } - ctx.mockVpp.MockReply(msgs...) - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - - reqCtx := ctx.ch.SendMultiRequest(&memif.MemifDump{}) - cnt := 0 - for { - msg := &memif.MemifDetails{} - stop, err := reqCtx.ReceiveReply(msg) - if stop { - break // break out of the loop - } - Expect(err).ShouldNot(HaveOccurred()) - cnt++ - } - Expect(cnt).To(BeEquivalentTo(10)) -} - -func TestNotifications(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - // subscribe for notification - notifChan := make(chan api.Message, 1) - subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) - Expect(err).ShouldNot(HaveOccurred()) - - // mock the notification and force its delivery - ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ - SwIfIndex: 3, - AdminUpDown: 1, - }) - ctx.mockVpp.SendMsg(0, []byte("")) - - // receive the notification - var notif *interfaces.SwInterfaceSetFlags - Eventually(func() *interfaces.SwInterfaceSetFlags { - select { - case n := <-notifChan: - notif = n.(*interfaces.SwInterfaceSetFlags) - return notif - default: - return nil - } - }).ShouldNot(BeNil()) - - // verify the received notifications - Expect(notif.SwIfIndex).To(BeEquivalentTo(3), "Incorrect SwIfIndex value for SwInterfaceSetFlags") - Expect(notif.AdminUpDown).To(BeEquivalentTo(1), "Incorrect AdminUpDown value for SwInterfaceSetFlags") - - ctx.ch.UnsubscribeNotification(subs) -} - -func TestNotificationEvent(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - // subscribe for notification - notifChan := make(chan api.Message, 1) - subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent) - Expect(err).ShouldNot(HaveOccurred()) - - // mock the notification and force its delivery - ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{ - SwIfIndex: 2, - LinkUpDown: 1, - }) - ctx.mockVpp.SendMsg(0, []byte("")) - - // receive the notification - var notif *interfaces.SwInterfaceEvent - Eventually(func() *interfaces.SwInterfaceEvent { - select { - case n := <-notifChan: - notif = n.(*interfaces.SwInterfaceEvent) - return notif - default: - return nil - } - }).ShouldNot(BeNil()) - - // verify the received notifications - Expect(notif.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for SwInterfaceSetFlags") - Expect(notif.LinkUpDown).To(BeEquivalentTo(1), "Incorrect LinkUpDown value for SwInterfaceSetFlags") - - ctx.ch.UnsubscribeNotification(subs) -} - -func TestCheckMessageCompatibility(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - err := ctx.ch.CheckMessageCompatibility(&interfaces.SwInterfaceSetFlags{}) - Expect(err).ShouldNot(HaveOccurred()) -} -func TestSetReplyTimeout(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.ch.SetReplyTimeout(time.Millisecond) - - // first one request should work - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) - Expect(err).ShouldNot(HaveOccurred()) - - // no other reply ready - expect timeout - err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("timeout")) -} - -func TestSetReplyTimeoutMultiRequest(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.ch.SetReplyTimeout(time.Millisecond) - - msgs := []api.Message{} - for i := 1; i <= 3; i++ { - msgs = append(msgs, &interfaces.SwInterfaceDetails{ - SwIfIndex: uint32(i), - InterfaceName: []byte("if-name-test"), - }) - } - ctx.mockVpp.MockReply(msgs...) - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - - cnt := 0 - sendMultiRequest := func() error { - reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) - for { - msg := &interfaces.SwInterfaceDetails{} - stop, err := reqCtx.ReceiveReply(msg) - if stop { - break // break out of the loop - } - if err != nil { - return err - } - cnt++ - } - return nil - } - - // first one request should work - err := sendMultiRequest() - Expect(err).ShouldNot(HaveOccurred()) - - // no other reply ready - expect timeout - err = sendMultiRequest() - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("timeout")) - - Expect(cnt).To(BeEquivalentTo(3)) -} - -func TestReceiveReplyNegative(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - // invalid context 1 - reqCtx1 := &api.RequestCtx{} - err := reqCtx1.ReceiveReply(&vpe.ControlPingReply{}) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("invalid request context")) - - // invalid context 2 - reqCtx2 := &api.MultiRequestCtx{} - _, err = reqCtx2.ReceiveReply(&vpe.ControlPingReply{}) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("invalid request context")) - - // NU - reqCtx3 := &api.RequestCtx{} - err = reqCtx3.ReceiveReply(nil) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("invalid request context")) -} - -func TestMultiRequestDouble(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - // mock reply - msgs := []mock.MsgWithContext{} - for i := 1; i <= 3; i++ { - msgs = append(msgs, mock.MsgWithContext{ - Msg: &interfaces.SwInterfaceDetails{ - SwIfIndex: uint32(i), - InterfaceName: []byte("if-name-test"), - }, - Multipart: true, - SeqNum: 1, - }) - } - msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 1}) - - for i := 1; i <= 3; i++ { - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &interfaces.SwInterfaceDetails{ - SwIfIndex: uint32(i), - InterfaceName: []byte("if-name-test"), - }, - Multipart: true, - SeqNum: 2, - }) - } - msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 2}) - - ctx.mockVpp.MockReplyWithContext(msgs...) - - cnt := 0 - var sendMultiRequest = func() error { - reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) - for { - msg := &interfaces.SwInterfaceDetails{} - stop, err := reqCtx.ReceiveReply(msg) - if stop { - break // break out of the loop - } - if err != nil { - return err - } - cnt++ - } - return nil - } - - err := sendMultiRequest() - Expect(err).ShouldNot(HaveOccurred()) - - err = sendMultiRequest() - Expect(err).ShouldNot(HaveOccurred()) - - Expect(cnt).To(BeEquivalentTo(6)) -} - -func TestReceiveReplyAfterTimeout(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.ch.SetReplyTimeout(time.Millisecond) - - // first one request should work - ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 1}) - err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) - Expect(err).ShouldNot(HaveOccurred()) - - err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("timeout")) - - ctx.mockVpp.MockReplyWithContext( - // simulating late reply - mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 2}, - // normal reply for next request - mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3}) - - req := &tap.TapConnect{ - TapName: []byte("test-tap-name"), - UseRandomMac: 1, - } - reply := &tap.TapConnectReply{} - - // should succeed - err = ctx.ch.SendRequest(req).ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) -} - -func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { - /* - TODO: fix mock adapter - This test will fail because mock adapter will stop sending replies - when it encounters control_ping_reply from multi request, - thus never sending reply for next request - */ - t.Skip() - - ctx := setupTest(t) - defer ctx.teardownTest() - - ctx.ch.SetReplyTimeout(time.Millisecond * 100) - - // first one request should work - ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 1}) - err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) - Expect(err).ShouldNot(HaveOccurred()) - - cnt := 0 - var sendMultiRequest = func() error { - reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) - for { - msg := &interfaces.SwInterfaceDetails{} - stop, err := reqCtx.ReceiveReply(msg) - if stop { - break // break out of the loop - } - if err != nil { - return err - } - cnt++ - } - return nil - } - - err = sendMultiRequest() - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("timeout")) - Expect(cnt).To(BeEquivalentTo(0)) - - // simulating late replies - msgs := []mock.MsgWithContext{} - for i := 1; i <= 3; i++ { - msgs = append(msgs, mock.MsgWithContext{ - Msg: &interfaces.SwInterfaceDetails{ - SwIfIndex: uint32(i), - InterfaceName: []byte("if-name-test"), - }, - Multipart: true, - SeqNum: 2, - }) - } - msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 2}) - ctx.mockVpp.MockReplyWithContext(msgs...) - - // normal reply for next request - ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3}) - - req := &tap.TapConnect{ - TapName: []byte("test-tap-name"), - UseRandomMac: 1, - } - reply := &tap.TapConnectReply{} - - // should succeed - err = ctx.ch.SendRequest(req).ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) -} - -func TestInvalidMessageID(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - // first one request should work - ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) - err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{}) - Expect(err).ShouldNot(HaveOccurred()) - - // second should fail with error invalid message ID - ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) - err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("invalid message ID")) -} diff --git a/api/doc.go b/api/doc.go index e1abffe..044a820 100644 --- a/api/doc.go +++ b/api/doc.go @@ -1,94 +1,2 @@ -// Package api provides API for communication with govpp core using Go channels, -// without the need of importing the govpp core package itself. -// -// The API offers two ways of communication with govpp core: using Go channels, or using convenient function -// wrappers over the Go channels. The latter should be sufficient for most of the use cases. -// -// The entry point to the API is the Channel structure, that can be obtained from the existing connection using -// the NewAPIChannel or NewAPIChannelBuffered functions: -// -// conn, err := govpp.Connect() -// if err != nil { -// // handle error! -// } -// defer conn.Disconnect() -// -// ch, err := conn.NewAPIChannel() -// if err != nil { -// // handle error! -// } -// defer ch.Close() -// -// -// Simple Request-Reply API -// -// The simple version of the API is based on blocking SendRequest / ReceiveReply calls, where a single request -// message is sent to VPP and a single reply message is filled in when the reply comes from VPP: -// -// req := &acl.ACLPluginGetVersion{} -// reply := &acl.ACLPluginGetVersionReply{} -// -// err := ch.SendRequest(req).ReceiveReply(reply) -// // process the reply -// -// Note that if the reply message type that comes from VPP does not match with provided one, you'll get an error. -// -// -// Multipart Reply API -// -// If multiple messages are expected as a reply to a request, SendMultiRequest API must be used: -// -// req := &interfaces.SwInterfaceDump{} -// reqCtx := ch.SendMultiRequest(req) -// -// for { -// reply := &interfaces.SwInterfaceDetails{} -// stop, err := reqCtx.ReceiveReply(reply) -// if stop { -// break // break out of the loop -// } -// // process the reply -// } -// -// Note that if the last reply has been already consumed, stop boolean return value is set to true. -// Do not use the message itself if stop is true - it won't be filled with actual data. -// -// -// Go Channels API -// -// The blocking API introduced above may be not sufficient for some management applications that strongly -// rely on usage of Go channels. In this case, the API allows to access the underlying Go channels directly, e.g. -// the following replacement of the SendRequest / ReceiveReply API: -// -// req := &acl.ACLPluginGetVersion{} -// // send the request to the request go channel -// ch.ReqChan <- &api.VppRequest{Message: req} -// -// // receive a reply from the reply go channel -// vppReply := <-ch.ReplyChan -// -// // decode the message -// reply := &acl.ACLPluginGetVersionReply{} -// err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) -// -// // process the reply -// -// -// Notifications API -// -// to subscribe for receiving of the specified notification messages via provided Go channel, use the -// SubscribeNotification API: -// -// // subscribe for specific notification message -// notifChan := make(chan api.Message, 100) -// subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) -// -// // receive one notification -// notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) -// -// ch.UnsubscribeNotification(subs) -// -// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's -// buffer is full, the notifications will not be delivered into it. -// +// Package api defines interfaces required by every file generated with binapi-generator package api diff --git a/codec/doc.go b/codec/doc.go new file mode 100644 index 0000000..eb18e15 --- /dev/null +++ b/codec/doc.go @@ -0,0 +1,2 @@ +// Package codec provides methods allowing to encode and decode message structs to/from binary format accepted by VPP. +package codec diff --git a/codec/msg_codec.go b/codec/msg_codec.go new file mode 100644 index 0000000..7ba8771 --- /dev/null +++ b/codec/msg_codec.go @@ -0,0 +1,141 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "bytes" + "errors" + "fmt" + "reflect" + + "git.fd.io/govpp.git/api" + "github.com/lunixbochs/struc" +) + +// MsgCodec provides encoding and decoding functionality of `api.Message` structs into/from +// binary format as accepted by VPP. +type MsgCodec struct{} + +// VppRequestHeader struct contains header fields implemented by all VPP requests. +type VppRequestHeader struct { + VlMsgID uint16 + ClientIndex uint32 + Context uint32 +} + +// VppReplyHeader struct contains header fields implemented by all VPP replies. +type VppReplyHeader struct { + VlMsgID uint16 + Context uint32 +} + +// VppEventHeader struct contains header fields implemented by all VPP events. +type VppEventHeader struct { + VlMsgID uint16 + Context uint32 +} + +// VppOtherHeader struct contains header fields implemented by other VPP messages (not requests nor replies). +type VppOtherHeader struct { + VlMsgID uint16 +} + +const ( + vppRequestHeaderSize = 10 // size of a VPP request header + vppReplyHeaderSize = 6 // size of a VPP reply header + vppEventHeaderSize = 6 // size of a VPP event header + vppOtherHeaderSize = 2 // size of the header of other VPP messages +) + +// EncodeMsg encodes provided `Message` structure into its binary-encoded data representation. +func (*MsgCodec) EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) { + if msg == nil { + return nil, errors.New("nil message passed in") + } + + buf := new(bytes.Buffer) + + // encode message header + var header interface{} + if msg.GetMessageType() == api.RequestMessage { + header = &VppRequestHeader{VlMsgID: msgID} + } else if msg.GetMessageType() == api.ReplyMessage { + header = &VppReplyHeader{VlMsgID: msgID} + } else if msg.GetMessageType() == api.EventMessage { + header = &VppEventHeader{VlMsgID: msgID} + } else { + header = &VppOtherHeader{VlMsgID: msgID} + } + err := struc.Pack(buf, header) + if err != nil { + return nil, fmt.Errorf("unable to encode message: header: %v, error %v", header, err) + } + + // encode message content + if reflect.Indirect(reflect.ValueOf(msg)).NumField() > 0 { + err := struc.Pack(buf, msg) + if err != nil { + return nil, fmt.Errorf("unable to encode message: header %v, error %v", header, err) + } + } + + return buf.Bytes(), nil +} + +// DecodeMsg decodes binary-encoded data of a message into provided `Message` structure. +func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error { + if msg == nil { + return errors.New("nil message passed in") + } + + buf := bytes.NewReader(data) + + // check which header is expected + var header interface{} + if msg.GetMessageType() == api.RequestMessage { + header = &VppRequestHeader{} + } else if msg.GetMessageType() == api.ReplyMessage { + header = &VppReplyHeader{} + } else if msg.GetMessageType() == api.EventMessage { + header = &VppEventHeader{} + } else { + header = &VppOtherHeader{} + } + + // decode message header + err := struc.Unpack(buf, header) + if err != nil { + return fmt.Errorf("unable to decode message: data %v, error %v", data, err) + } + + // get rid of the message header + if msg.GetMessageType() == api.RequestMessage { + buf = bytes.NewReader(data[vppRequestHeaderSize:]) + } else if msg.GetMessageType() == api.ReplyMessage { + buf = bytes.NewReader(data[vppReplyHeaderSize:]) + } else if msg.GetMessageType() == api.EventMessage { + buf = bytes.NewReader(data[vppEventHeaderSize:]) + } else { + buf = bytes.NewReader(data[vppOtherHeaderSize:]) + } + + // decode message content + err = struc.Unpack(buf, msg) + if err != nil { + return fmt.Errorf("unable to decode message: data %v, error %v", data, err) + } + + return nil +} diff --git a/core/channel.go b/core/channel.go new file mode 100644 index 0000000..87b3e29 --- /dev/null +++ b/core/channel.go @@ -0,0 +1,276 @@ +// Copyright (c) 2018 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "fmt" + "time" + + "errors" + + "git.fd.io/govpp.git/api" + "github.com/sirupsen/logrus" +) + +const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout + +// requestCtxData is a context of a ongoing request (simple one - only one response is expected). +type requestCtxData struct { + ch *channel + seqNum uint16 +} + +// multiRequestCtxData is a context of a ongoing multipart request (multiple responses are expected). +type multiRequestCtxData struct { + ch *channel + seqNum uint16 +} + +func (req *requestCtxData) ReceiveReply(msg api.Message) error { + if req == nil || req.ch == nil { + return errors.New("invalid request context") + } + + lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) + + if lastReplyReceived { + err = errors.New("multipart reply recieved while a simple reply expected") + } + return err +} + +func (req *multiRequestCtxData) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { + if req == nil || req.ch == nil { + return false, errors.New("invalid request context") + } + + return req.ch.receiveReplyInternal(msg, req.seqNum) +} + +// channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests +// to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels +// via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines +// concurrently, otherwise the responses could mix! Use multiple channels instead. +type channel struct { + id uint16 // channel ID + + reqChan chan *api.VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider + replyChan chan *api.VppReply // channel where VPP replies are delivered to + + notifSubsChan chan *api.NotifSubscribeRequest // channel for sending notification subscribe requests + notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to + + msgDecoder api.MessageDecoder // used to decode binary data to generated API messages + msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message + + lastSeqNum uint16 // sequence number of the last sent request + + delayedReply *api.VppReply // reply already taken from ReplyChan, buffered for later delivery + replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout +} + +func (ch *channel) SendRequest(msg api.Message) api.RequestCtx { + ch.lastSeqNum++ + ch.reqChan <- &api.VppRequest{ + Message: msg, + SeqNum: ch.lastSeqNum, + } + return &requestCtxData{ch: ch, seqNum: ch.lastSeqNum} +} + +func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { + ch.lastSeqNum++ + ch.reqChan <- &api.VppRequest{ + Message: msg, + Multipart: true, + SeqNum: ch.lastSeqNum, + } + return &multiRequestCtxData{ch: ch, seqNum: ch.lastSeqNum} +} + +func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) { + subscription := &api.NotifSubscription{ + NotifChan: notifChan, + MsgFactory: msgFactory, + } + ch.notifSubsChan <- &api.NotifSubscribeRequest{ + Subscription: subscription, + Subscribe: true, + } + return subscription, <-ch.notifSubsReplyChan +} + +func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error { + ch.notifSubsChan <- &api.NotifSubscribeRequest{ + Subscription: subscription, + Subscribe: false, + } + return <-ch.notifSubsReplyChan +} + +func (ch *channel) CheckMessageCompatibility(messages ...api.Message) error { + for _, msg := range messages { + _, err := ch.msgIdentifier.GetMessageID(msg) + if err != nil { + return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + } + } + return nil +} + +func (ch *channel) SetReplyTimeout(timeout time.Duration) { + ch.replyTimeout = timeout +} + +func (ch *channel) GetRequestChannel() chan<- *api.VppRequest { + return ch.reqChan +} + +func (ch *channel) GetReplyChannel() <-chan *api.VppReply { + return ch.replyChan +} + +func (ch *channel) GetNotificationChannel() chan<- *api.NotifSubscribeRequest { + return ch.notifSubsChan +} + +func (ch *channel) GetNotificationReplyChannel() <-chan error { + return ch.notifSubsReplyChan +} + +func (ch *channel) GetMessageDecoder() api.MessageDecoder { + return ch.msgDecoder +} + +func (ch *channel) GetID() uint16 { + return ch.id +} + +func (ch *channel) Close() { + if ch.reqChan != nil { + close(ch.reqChan) + } +} + +// receiveReplyInternal receives a reply from the reply channel into the provided msg structure. +func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) { + var ignore bool + if msg == nil { + return false, errors.New("nil message passed in") + } + + if ch.delayedReply != nil { + // try the delayed reply + vppReply := ch.delayedReply + ch.delayedReply = nil + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if !ignore { + return lastReplyReceived, err + } + } + + timer := time.NewTimer(ch.replyTimeout) + for { + select { + // blocks until a reply comes to ReplyChan or until timeout expires + case vppReply := <-ch.replyChan: + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if ignore { + continue + } + return lastReplyReceived, err + + case <-timer.C: + err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) + return false, err + } + } + return +} + +func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { + // check the sequence number + cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) + if cmpSeqNums == -1 { + // reply received too late, ignore the message + logrus.WithField("sequence-number", reply.SeqNum).Warn( + "Received reply to an already closed binary API request") + ignore = true + return + } + if cmpSeqNums == 1 { + ch.delayedReply = reply + err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum) + return + } + + if reply.Error != nil { + err = reply.Error + return + } + if reply.LastReplyReceived { + lastReplyReceived = true + return + } + + // message checks + var expMsgID uint16 + expMsgID, err = ch.msgIdentifier.GetMessageID(msg) + if err != nil { + err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + return + } + + if reply.MessageID != expMsgID { + var msgNameCrc string + if nameCrc, err := ch.msgIdentifier.LookupByID(reply.MessageID); err != nil { + msgNameCrc = err.Error() + } else { + msgNameCrc = nameCrc + } + + err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+ + "(check if multiple goroutines are not sharing single GoVPP channel)", + reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) + return + } + + // decode the message + err = ch.msgDecoder.DecodeMsg(reply.Data, msg) + return +} + +// compareSeqNumbers returns -1, 0, 1 if sequence number precedes, equals to, +// or succeeds seq. number . +// Since sequence numbers cycle in the finite set of size 2^16, the function +// must assume that the distance between compared sequence numbers is less than +// (2^16)/2 to determine the order. +func compareSeqNumbers(seqNum1, seqNum2 uint16) int { + // calculate distance from seqNum1 to seqNum2 + var dist uint16 + if seqNum1 <= seqNum2 { + dist = seqNum2 - seqNum1 + } else { + dist = 0xffff - (seqNum1 - seqNum2 - 1) + } + if dist == 0 { + return 0 + } else if dist <= 0x8000 { + return -1 + } + return 1 +} diff --git a/core/channel_test.go b/core/channel_test.go new file mode 100644 index 0000000..d573f29 --- /dev/null +++ b/core/channel_test.go @@ -0,0 +1,587 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "testing" + "time" + + "git.fd.io/govpp.git/adapter/mock" + "git.fd.io/govpp.git/core/bin_api/vpe" + "git.fd.io/govpp.git/examples/bin_api/interfaces" + "git.fd.io/govpp.git/examples/bin_api/memif" + "git.fd.io/govpp.git/examples/bin_api/tap" + + "git.fd.io/govpp.git/api" + . "github.com/onsi/gomega" +) + +type testCtx struct { + mockVpp *mock.VppAdapter + conn *Connection + ch api.Channel +} + +func setupTest(t *testing.T) *testCtx { + RegisterTestingT(t) + + ctx := &testCtx{ + mockVpp: &mock.VppAdapter{}, + } + + var err error + ctx.conn, err = Connect(ctx.mockVpp) + Expect(err).ShouldNot(HaveOccurred()) + + ctx.ch, err = ctx.conn.NewAPIChannel() + Expect(err).ShouldNot(HaveOccurred()) + + return ctx +} + +func (ctx *testCtx) teardownTest() { + ctx.ch.Close() + ctx.conn.Disconnect() +} + +func TestRequestReplyTapConnect(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&tap.TapConnectReply{ + Retval: 10, + SwIfIndex: 1, + }) + request := &tap.TapConnect{ + TapName: []byte("test-tap-name"), + UseRandomMac: 1, + } + reply := &tap.TapConnectReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(10), "Incorrect retval value for TapConnectReply") + Expect(reply.SwIfIndex).To(BeEquivalentTo(1), "Incorrect SwIfIndex value for TapConnectReply") +} + +func TestRequestReplyTapModify(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&tap.TapModifyReply{ + Retval: 15, + SwIfIndex: 2, + }) + request := &tap.TapModify{ + TapName: []byte("test-tap-modify"), + UseRandomMac: 1, + CustomDevInstance: 1, + } + reply := &tap.TapModifyReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(15), "Incorrect retval value for TapModifyReply") + Expect(reply.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for TapModifyReply") +} + +func TestRequestReplyTapDelete(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&tap.TapDeleteReply{ + Retval: 20, + }) + request := &tap.TapDelete{ + SwIfIndex: 3, + } + reply := &tap.TapDeleteReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(20), "Incorrect retval value for TapDeleteReply") +} + +func TestRequestReplySwInterfaceTapDump(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + byteName := []byte("dev-name-test") + ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{ + SwIfIndex: 25, + DevName: byteName, + }) + request := &tap.SwInterfaceTapDump{} + reply := &tap.SwInterfaceTapDetails{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for SwInterfaceTapDetails") + Expect(reply.DevName).ToNot(BeNil(), "Incorrect DevName value for SwInterfaceTapDetails") +} + +func TestRequestReplyMemifCreate(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&memif.MemifCreateReply{ + Retval: 22, + SwIfIndex: 4, + }) + request := &memif.MemifCreate{ + Role: 10, + ID: 12, + RingSize: 8000, + BufferSize: 50, + } + reply := &memif.MemifCreateReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(22), "Incorrect Retval value for MemifCreate") + Expect(reply.SwIfIndex).To(BeEquivalentTo(4), "Incorrect SwIfIndex value for MemifCreate") +} + +func TestRequestReplyMemifDelete(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&memif.MemifDeleteReply{ + Retval: 24, + }) + request := &memif.MemifDelete{ + SwIfIndex: 15, + } + reply := &memif.MemifDeleteReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(24), "Incorrect Retval value for MemifDelete") +} + +func TestRequestReplyMemifDetails(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&memif.MemifDetails{ + SwIfIndex: 25, + IfName: []byte("memif-name"), + Role: 0, + }) + request := &memif.MemifDump{} + reply := &memif.MemifDetails{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for MemifDetails") + Expect(reply.IfName).ToNot(BeEmpty(), "MemifDetails IfName is empty byte array") + Expect(reply.Role).To(BeEquivalentTo(0), "Incorrect Role value for MemifDetails") +} + +func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // mock reply + var msgs []api.Message + for i := 1; i <= 10; i++ { + msgs = append(msgs, &tap.SwInterfaceTapDetails{ + SwIfIndex: uint32(i), + DevName: []byte("dev-name-test"), + }) + } + ctx.mockVpp.MockReply(msgs...) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + reqCtx := ctx.ch.SendMultiRequest(&tap.SwInterfaceTapDump{}) + cnt := 0 + for { + msg := &tap.SwInterfaceTapDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + Expect(err).ShouldNot(HaveOccurred()) + cnt++ + } + Expect(cnt).To(BeEquivalentTo(10)) +} + +func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // mock reply + var msgs []api.Message + for i := 1; i <= 10; i++ { + msgs = append(msgs, &memif.MemifDetails{ + SwIfIndex: uint32(i), + }) + } + ctx.mockVpp.MockReply(msgs...) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + reqCtx := ctx.ch.SendMultiRequest(&memif.MemifDump{}) + cnt := 0 + for { + msg := &memif.MemifDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + Expect(err).ShouldNot(HaveOccurred()) + cnt++ + } + Expect(cnt).To(BeEquivalentTo(10)) +} + +func TestNotifications(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // subscribe for notification + notifChan := make(chan api.Message, 1) + subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) + Expect(err).ShouldNot(HaveOccurred()) + + // mock the notification and force its delivery + ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ + SwIfIndex: 3, + AdminUpDown: 1, + }) + ctx.mockVpp.SendMsg(0, []byte("")) + + // receive the notification + var notif *interfaces.SwInterfaceSetFlags + Eventually(func() *interfaces.SwInterfaceSetFlags { + select { + case n := <-notifChan: + notif = n.(*interfaces.SwInterfaceSetFlags) + return notif + default: + return nil + } + }).ShouldNot(BeNil()) + + // verify the received notifications + Expect(notif.SwIfIndex).To(BeEquivalentTo(3), "Incorrect SwIfIndex value for SwInterfaceSetFlags") + Expect(notif.AdminUpDown).To(BeEquivalentTo(1), "Incorrect AdminUpDown value for SwInterfaceSetFlags") + + ctx.ch.UnsubscribeNotification(subs) +} + +func TestNotificationEvent(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // subscribe for notification + notifChan := make(chan api.Message, 1) + subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent) + Expect(err).ShouldNot(HaveOccurred()) + + // mock the notification and force its delivery + ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{ + SwIfIndex: 2, + LinkUpDown: 1, + }) + ctx.mockVpp.SendMsg(0, []byte("")) + + // receive the notification + var notif *interfaces.SwInterfaceEvent + Eventually(func() *interfaces.SwInterfaceEvent { + select { + case n := <-notifChan: + notif = n.(*interfaces.SwInterfaceEvent) + return notif + default: + return nil + } + }).ShouldNot(BeNil()) + + // verify the received notifications + Expect(notif.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for SwInterfaceSetFlags") + Expect(notif.LinkUpDown).To(BeEquivalentTo(1), "Incorrect LinkUpDown value for SwInterfaceSetFlags") + + ctx.ch.UnsubscribeNotification(subs) +} + +func TestCheckMessageCompatibility(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + err := ctx.ch.CheckMessageCompatibility(&interfaces.SwInterfaceSetFlags{}) + Expect(err).ShouldNot(HaveOccurred()) +} +func TestSetReplyTimeout(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond) + + // first one request should work + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + // no other reply ready - expect timeout + err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) +} + +func TestSetReplyTimeoutMultiRequest(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond) + + var msgs []api.Message + for i := 1; i <= 3; i++ { + msgs = append(msgs, &interfaces.SwInterfaceDetails{ + SwIfIndex: uint32(i), + InterfaceName: []byte("if-name-test"), + }) + } + ctx.mockVpp.MockReply(msgs...) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + cnt := 0 + sendMultiRequest := func() error { + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + for { + msg := &interfaces.SwInterfaceDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + if err != nil { + return err + } + cnt++ + } + return nil + } + + // first one request should work + err := sendMultiRequest() + Expect(err).ShouldNot(HaveOccurred()) + + // no other reply ready - expect timeout + err = sendMultiRequest() + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) + + Expect(cnt).To(BeEquivalentTo(3)) +} + +func TestReceiveReplyNegative(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // invalid context 1 + reqCtx1 := &requestCtxData{} + err := reqCtx1.ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid request context")) + + // invalid context 2 + reqCtx2 := &multiRequestCtxData{} + _, err = reqCtx2.ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid request context")) + + // NU + reqCtx3 := &requestCtxData{} + err = reqCtx3.ReceiveReply(nil) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid request context")) +} + +func TestMultiRequestDouble(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // mock reply + var msgs []mock.MsgWithContext + for i := 1; i <= 3; i++ { + msgs = append(msgs, mock.MsgWithContext{ + Msg: &interfaces.SwInterfaceDetails{ + SwIfIndex: uint32(i), + InterfaceName: []byte("if-name-test"), + }, + Multipart: true, + SeqNum: 1, + }) + } + msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 1}) + + for i := 1; i <= 3; i++ { + msgs = append(msgs, + mock.MsgWithContext{ + Msg: &interfaces.SwInterfaceDetails{ + SwIfIndex: uint32(i), + InterfaceName: []byte("if-name-test"), + }, + Multipart: true, + SeqNum: 2, + }) + } + msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 2}) + + ctx.mockVpp.MockReplyWithContext(msgs...) + + cnt := 0 + var sendMultiRequest = func() error { + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + for { + msg := &interfaces.SwInterfaceDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + if err != nil { + return err + } + cnt++ + } + return nil + } + + err := sendMultiRequest() + Expect(err).ShouldNot(HaveOccurred()) + + err = sendMultiRequest() + Expect(err).ShouldNot(HaveOccurred()) + + Expect(cnt).To(BeEquivalentTo(6)) +} + +func TestReceiveReplyAfterTimeout(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond) + + // first one request should work + ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 1}) + err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) + + ctx.mockVpp.MockReplyWithContext( + // simulating late reply + mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 2}, + // normal reply for next request + mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3}) + + req := &tap.TapConnect{ + TapName: []byte("test-tap-name"), + UseRandomMac: 1, + } + reply := &tap.TapConnectReply{} + + // should succeed + err = ctx.ch.SendRequest(req).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) +} + +func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { + /* + TODO: fix mock adapter + This test will fail because mock adapter will stop sending replies + when it encounters control_ping_reply from multi request, + thus never sending reply for next request + */ + t.Skip() + + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond * 100) + + // first one request should work + ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 1}) + err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + cnt := 0 + var sendMultiRequest = func() error { + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + for { + msg := &interfaces.SwInterfaceDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + if err != nil { + return err + } + cnt++ + } + return nil + } + + err = sendMultiRequest() + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) + Expect(cnt).To(BeEquivalentTo(0)) + + // simulating late replies + var msgs []mock.MsgWithContext + for i := 1; i <= 3; i++ { + msgs = append(msgs, mock.MsgWithContext{ + Msg: &interfaces.SwInterfaceDetails{ + SwIfIndex: uint32(i), + InterfaceName: []byte("if-name-test"), + }, + Multipart: true, + SeqNum: 2, + }) + } + msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 2}) + ctx.mockVpp.MockReplyWithContext(msgs...) + + // normal reply for next request + ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3}) + + req := &tap.TapConnect{ + TapName: []byte("test-tap-name"), + UseRandomMac: 1, + } + reply := &tap.TapConnectReply{} + + // should succeed + err = ctx.ch.SendRequest(req).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) +} + +func TestInvalidMessageID(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // first one request should work + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + // second should fail with error invalid message ID + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid message ID")) +} diff --git a/core/connection.go b/core/connection.go new file mode 100644 index 0000000..a44d0c4 --- /dev/null +++ b/core/connection.go @@ -0,0 +1,403 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api + +package core + +import ( + "errors" + "os" + "sync" + "sync/atomic" + "time" + + logger "github.com/sirupsen/logrus" + + "git.fd.io/govpp.git/adapter" + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/codec" + "git.fd.io/govpp.git/core/bin_api/vpe" +) + +var ( + msgControlPing api.Message = &vpe.ControlPing{} + msgControlPingReply api.Message = &vpe.ControlPingReply{} +) + +const ( + requestChannelBufSize = 100 // default size of the request channel buffers + replyChannelBufSize = 100 // default size of the reply channel buffers + notificationChannelBufSize = 100 // default size of the notification channel buffers +) + +var ( + healthCheckProbeInterval = time.Second * 1 // default health check probe interval + healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe + healthCheckThreshold = 1 // number of failed healthProbe until the error is reported +) + +// ConnectionState holds the current state of the connection to VPP. +type ConnectionState int + +const ( + // Connected connection state means that the connection to VPP has been successfully established. + Connected ConnectionState = iota + + // Disconnected connection state means that the connection to VPP has been lost. + Disconnected +) + +// ConnectionEvent is a notification about change in the VPP connection state. +type ConnectionEvent struct { + // Timestamp holds the time when the event has been generated. + Timestamp time.Time + + // State holds the new state of the connection to VPP at the time when the event has been generated. + State ConnectionState +} + +// Connection represents a shared memory connection to VPP via vppAdapter. +type Connection struct { + vpp adapter.VppAdapter // VPP adapter + connected uint32 // non-zero if the adapter is connected to VPP + codec *codec.MsgCodec // message codec + + msgIDsLock sync.RWMutex // lock for the message IDs map + msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC + + channelsLock sync.RWMutex // lock for the channels map + channels map[uint16]*channel // map of all API channels indexed by the channel ID + + notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map + notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID + + maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) + pingReqID uint16 // ID if the ControlPing message + pingReplyID uint16 // ID of the ControlPingReply message + + lastReplyLock sync.Mutex // lock for the last reply + lastReply time.Time // time of the last received reply from VPP +} + +var ( + log *logger.Logger // global logger + conn *Connection // global handle to the Connection (used in the message receive callback) + connLock sync.RWMutex // lock for the global connection +) + +// init initializes global logger, which logs debug level messages to stdout. +func init() { + log = logger.New() + log.Out = os.Stdout + log.Level = logger.DebugLevel +} + +// SetLogger sets global logger to provided one. +func SetLogger(l *logger.Logger) { + log = l +} + +// SetHealthCheckProbeInterval sets health check probe interval. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckProbeInterval(interval time.Duration) { + healthCheckProbeInterval = interval +} + +// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. +// If reply arrives after the timeout, check is considered as failed. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckReplyTimeout(timeout time.Duration) { + healthCheckReplyTimeout = timeout +} + +// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckThreshold(threshold int) { + healthCheckThreshold = threshold +} + +// SetControlPingMessages sets the messages for ControlPing and ControlPingReply +func SetControlPingMessages(controPing, controlPingReply api.Message) { + msgControlPing = controPing + msgControlPingReply = controlPingReply +} + +// Connect connects to VPP using specified VPP adapter and returns the connection handle. +// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. +func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, err + } + + // blocking attempt to connect to VPP + err = c.connectVPP() + if err != nil { + return nil, err + } + + return conn, nil +} + +// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle +// and ConnectionState channel. This call does not block until connection is established, it +// returns immediately. The caller is supposed to watch the returned ConnectionState channel for +// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. +func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, nil, err + } + + // asynchronously attempt to connect to VPP + connChan := make(chan ConnectionEvent, notificationChannelBufSize) + go c.connectLoop(connChan) + + return conn, connChan, nil +} + +// Disconnect disconnects from VPP and releases all connection-related resources. +func (c *Connection) Disconnect() { + if c == nil { + return + } + connLock.Lock() + defer connLock.Unlock() + + if c != nil && c.vpp != nil { + c.disconnectVPP() + } + conn = nil +} + +// newConnection returns new connection handle. +func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { + connLock.Lock() + defer connLock.Unlock() + + if conn != nil { + return nil, errors.New("only one connection per process is supported") + } + + conn = &Connection{ + vpp: vppAdapter, + codec: &codec.MsgCodec{}, + channels: make(map[uint16]*channel), + msgIDs: make(map[string]uint16), + notifSubscriptions: make(map[uint16][]*api.NotifSubscription), + } + + conn.vpp.SetMsgCallback(msgCallback) + return conn, nil +} + +// connectVPP performs one blocking attempt to connect to VPP. +func (c *Connection) connectVPP() error { + log.Debug("Connecting to VPP...") + + // blocking connect + err := c.vpp.Connect() + if err != nil { + log.Warn(err) + return err + } + + // store control ping IDs + if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil { + c.vpp.Disconnect() + return err + } + if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil { + c.vpp.Disconnect() + return err + } + + // store connected state + atomic.StoreUint32(&c.connected, 1) + + log.Info("Connected to VPP.") + return nil +} + +// disconnectVPP disconnects from VPP in case it is connected. +func (c *Connection) disconnectVPP() { + if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { + c.vpp.Disconnect() + } +} + +// connectLoop attempts to connect to VPP until it succeeds. +// Then it continues with healthCheckLoop. +func (c *Connection) connectLoop(connChan chan ConnectionEvent) { + // loop until connected + for { + if err := c.vpp.WaitReady(); err != nil { + log.Warnf("wait ready failed: %v", err) + } + if err := c.connectVPP(); err == nil { + // signal connected event + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + break + } else { + log.Errorf("connecting to VPP failed: %v", err) + time.Sleep(time.Second) + } + } + + // we are now connected, continue with health check loop + c.healthCheckLoop(connChan) +} + +// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, +// it continues with connectLoop and tries to reconnect. +func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { + // create a separate API channel for health check probes + ch, err := conn.newAPIChannelBuffered(1, 1) + if err != nil { + log.Error("Failed to create health check API channel, health check will be disabled:", err) + return + } + + var sinceLastReply time.Duration + var failedChecks int + + // send health check probes until an error or timeout occurs + for { + // sleep until next health check probe period + time.Sleep(healthCheckProbeInterval) + + if atomic.LoadUint32(&c.connected) == 0 { + // Disconnect has been called in the meantime, return the healthcheck - reconnect loop + log.Debug("Disconnected on request, exiting health check loop.") + return + } + + // try draining probe replies from previous request before sending next one + select { + case <-ch.replyChan: + log.Debug("drained old probe reply from reply channel") + default: + } + + // send the control ping request + ch.reqChan <- &api.VppRequest{Message: msgControlPing} + + for { + // expect response within timeout period + select { + case vppReply := <-ch.replyChan: + err = vppReply.Error + + case <-time.After(healthCheckReplyTimeout): + err = ErrProbeTimeout + + // check if time since last reply from any other + // channel is less than health check reply timeout + conn.lastReplyLock.Lock() + sinceLastReply = time.Since(c.lastReply) + conn.lastReplyLock.Unlock() + + if sinceLastReply < healthCheckReplyTimeout { + log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) + continue + } + } + break + } + + if err == ErrProbeTimeout { + failedChecks++ + log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks) + if failedChecks > healthCheckThreshold { + // in case of exceeded treshold disconnect + log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold) + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + break + } + } else if err != nil { + // in case of error disconnect + log.Errorf("VPP health check probe failed: %v", err) + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + break + } else if failedChecks > 0 { + failedChecks = 0 + log.Infof("VPP health check probe OK") + } + } + + // cleanup + ch.Close() + c.disconnectVPP() + + // we are now disconnected, start connect loop + c.connectLoop(connChan) +} + +func (c *Connection) NewAPIChannel() (api.Channel, error) { + return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize) +} + +func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { + return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize) +} + +// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. +// It allows to specify custom buffer sizes for the request and reply Go channels. +func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + ch := &channel{ + id: chID, + replyTimeout: defaultReplyTimeout, + } + ch.msgDecoder = c.codec + ch.msgIdentifier = c + + // create the communication channels + ch.reqChan = make(chan *api.VppRequest, reqChanBufSize) + ch.replyChan = make(chan *api.VppReply, replyChanBufSize) + ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize) + ch.notifSubsReplyChan = make(chan error, replyChanBufSize) + + // store API channel within the client + c.channelsLock.Lock() + c.channels[chID] = ch + c.channelsLock.Unlock() + + // start watching on the request channel + go c.watchRequests(ch) + + return ch, nil +} + +// releaseAPIChannel releases API channel that needs to be closed. +func (c *Connection) releaseAPIChannel(ch *channel) { + log.WithFields(logger.Fields{ + "ID": ch.id, + }).Debug("API channel closed.") + + // delete the channel from channels map + c.channelsLock.Lock() + delete(c.channels, ch.id) + c.channelsLock.Unlock() +} diff --git a/core/connection_test.go b/core/connection_test.go new file mode 100644 index 0000000..b7c3aa0 --- /dev/null +++ b/core/connection_test.go @@ -0,0 +1,543 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core_test + +import ( + "testing" + + "git.fd.io/govpp.git/adapter/mock" + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/core" + "git.fd.io/govpp.git/core/bin_api/vpe" + "git.fd.io/govpp.git/examples/bin_api/interfaces" + "git.fd.io/govpp.git/examples/bin_api/stats" + + "git.fd.io/govpp.git/codec" + . "github.com/onsi/gomega" +) + +type testCtx struct { + mockVpp *mock.VppAdapter + conn *core.Connection + ch api.Channel +} + +func setupTest(t *testing.T, bufferedChan bool) *testCtx { + RegisterTestingT(t) + + ctx := &testCtx{} + ctx.mockVpp = &mock.VppAdapter{} + + var err error + ctx.conn, err = core.Connect(ctx.mockVpp) + Expect(err).ShouldNot(HaveOccurred()) + + if bufferedChan { + ctx.ch, err = ctx.conn.NewAPIChannelBuffered(100, 100) + } else { + ctx.ch, err = ctx.conn.NewAPIChannel() + } + Expect(err).ShouldNot(HaveOccurred()) + + return ctx +} + +func (ctx *testCtx) teardownTest() { + ctx.ch.Close() + ctx.conn.Disconnect() +} + +func TestSimpleRequest(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}) + + req := &vpe.ControlPing{} + reply := &vpe.ControlPingReply{} + + // send the request and receive a reply + ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: req} + vppReply := <-ctx.ch.GetReplyChannel() + + Expect(vppReply).ShouldNot(BeNil()) + Expect(vppReply.Error).ShouldNot(HaveOccurred()) + + // decode the message + err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) + Expect(err).ShouldNot(HaveOccurred()) + + Expect(reply.Retval).To(BeEquivalentTo(-5)) +} + +func TestMultiRequest(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + msgs := []api.Message{} + for m := 0; m < 10; m++ { + msgs = append(msgs, &interfaces.SwInterfaceDetails{}) + } + ctx.mockVpp.MockReply(msgs...) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + // send multipart request + ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} + + cnt := 0 + for { + // receive a reply + vppReply := <-ctx.ch.GetReplyChannel() + if vppReply.LastReplyReceived { + break // break out of the loop + } + Expect(vppReply.Error).ShouldNot(HaveOccurred()) + + // decode the message + reply := &interfaces.SwInterfaceDetails{} + err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) + Expect(err).ShouldNot(HaveOccurred()) + cnt++ + } + + Expect(cnt).To(BeEquivalentTo(10)) +} + +func TestNotifications(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // subscribe for notification + notifChan := make(chan api.Message, 1) + subscription := &api.NotifSubscription{ + NotifChan: notifChan, + MsgFactory: interfaces.NewSwInterfaceSetFlags, + } + ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ + Subscription: subscription, + Subscribe: true, + } + err := <-ctx.ch.GetNotificationReplyChannel() + Expect(err).ShouldNot(HaveOccurred()) + + // mock the notification and force its delivery + ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ + SwIfIndex: 3, + AdminUpDown: 1, + }) + ctx.mockVpp.SendMsg(0, []byte{0}) + + // receive the notification + notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) + + Expect(notif.SwIfIndex).To(BeEquivalentTo(3)) + + // unsubscribe notification + ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ + Subscription: subscription, + Subscribe: false, + } + err = <-ctx.ch.GetNotificationReplyChannel() + Expect(err).ShouldNot(HaveOccurred()) +} + +func TestNilConnection(t *testing.T) { + RegisterTestingT(t) + var conn *core.Connection + + ch, err := conn.NewAPIChannel() + Expect(ch).Should(BeNil()) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("nil")) + + ch, err = conn.NewAPIChannelBuffered(1, 1) + Expect(ch).Should(BeNil()) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("nil")) +} + +func TestDoubleConnection(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + conn, err := core.Connect(ctx.mockVpp) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("only one connection per process")) + Expect(conn).Should(BeNil()) +} + +func TestAsyncConnection(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + ctx.conn.Disconnect() + conn, ch, err := core.AsyncConnect(ctx.mockVpp) + ctx.conn = conn + + Expect(err).ShouldNot(HaveOccurred()) + Expect(conn).ShouldNot(BeNil()) + + ev := <-ch + Expect(ev.State).Should(BeEquivalentTo(core.Connected)) +} + +func TestFullBuffer(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // close the default API channel + ctx.ch.Close() + + // create a new channel with limited buffer sizes + var err error + ctx.ch, err = ctx.conn.NewAPIChannelBuffered(10, 1) + Expect(err).ShouldNot(HaveOccurred()) + + // send multiple requests, only one reply should be read + for i := 0; i < 20; i++ { + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &vpe.ControlPing{}} + } + + vppReply := <-ctx.ch.GetReplyChannel() + Expect(vppReply).ShouldNot(BeNil()) + + var received bool + select { + case <-ctx.ch.GetReplyChannel(): + received = true // this should not happen + default: + received = false // no reply to be received + } + Expect(received).Should(BeFalse(), "A reply has been recieved, should had been ignored.") +} + +func TestCodec(t *testing.T) { + RegisterTestingT(t) + + msgCodec := &codec.MsgCodec{} + + // request + data, err := msgCodec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11) + Expect(err).ShouldNot(HaveOccurred()) + Expect(data).ShouldNot(BeEmpty()) + + msg1 := &interfaces.CreateLoopback{} + err = msgCodec.DecodeMsg(data, msg1) + Expect(err).ShouldNot(HaveOccurred()) + Expect(msg1.MacAddress).To(BeEquivalentTo([]byte{1, 2, 3, 4, 5, 6})) + + // reply + data, err = msgCodec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22) + Expect(err).ShouldNot(HaveOccurred()) + Expect(data).ShouldNot(BeEmpty()) + + msg2 := &vpe.ControlPingReply{} + err = msgCodec.DecodeMsg(data, msg2) + Expect(err).ShouldNot(HaveOccurred()) + Expect(msg2.Retval).To(BeEquivalentTo(55)) + + // other + data, err = msgCodec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33) + Expect(err).ShouldNot(HaveOccurred()) + Expect(data).ShouldNot(BeEmpty()) + + msg3 := &stats.VnetIP4FibCounters{} + err = msgCodec.DecodeMsg(data, msg3) + Expect(err).ShouldNot(HaveOccurred()) + Expect(msg3.VrfID).To(BeEquivalentTo(77)) +} + +func TestCodecNegative(t *testing.T) { + RegisterTestingT(t) + + msgCodec := &codec.MsgCodec{} + + // nil message for encoding + data, err := msgCodec.EncodeMsg(nil, 15) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("nil message")) + Expect(data).Should(BeNil()) + + // nil message for decoding + err = msgCodec.DecodeMsg(data, nil) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("nil message")) + + // nil data for decoding + err = msgCodec.DecodeMsg(nil, &vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("EOF")) +} + +func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + var reqCtx []api.RequestCtx + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) + req := &vpe.ControlPing{} + reqCtx = append(reqCtx, ctx.ch.SendRequest(req)) + } + + for i := 0; i < 10; i++ { + reply := &vpe.ControlPingReply{} + err := reqCtx[i].ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(i)) + } +} + +func TestMultiRequestsWithSequenceNumbers(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + msgs := []api.Message{} + for i := 0; i < 10; i++ { + msgs = append(msgs, &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}) + } + ctx.mockVpp.MockReply(msgs...) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + // send multipart request + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + + cnt := 0 + for { + Expect(cnt < 11).To(BeTrue()) + + // receive a reply + reply := &interfaces.SwInterfaceDetails{} + lastReplyReceived, err := reqCtx.ReceiveReply(reply) + + if lastReplyReceived { + break // break out of the loop + } + + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(cnt)) + + cnt++ + } + + Expect(cnt).To(BeEquivalentTo(10)) +} + +func TestSimpleRequestWithTimeout(t *testing.T) { + ctx := setupTest(t, true) + defer ctx.teardownTest() + + // reply for a previous timeouted requests to be ignored + ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{Retval: 1}, + SeqNum: 0, + }) + + // send reply later + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + reply := &vpe.ControlPingReply{} + err := reqCtx1.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) + + ctx.mockVpp.MockReplyWithContext( + // reply for the previous request + mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{Retval: 1}, + SeqNum: 1, + }, + // reply for the next request + mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{Retval: 2}, + SeqNum: 2, + }) + + // next request + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // second request should ignore the first reply and return the second one + reply = &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply) + Expect(err).To(BeNil()) + Expect(reply.Retval).To(BeEquivalentTo(2)) +} + +func TestSimpleRequestsWithMissingReply(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // request without reply + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + // another request without reply + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // third request with reply + ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{Retval: 3}, + SeqNum: 3, + }) + req3 := &vpe.ControlPing{} + reqCtx3 := ctx.ch.SendRequest(req3) + + // the first two should fail, but not consume reply for the 3rd + reply := &vpe.ControlPingReply{} + err := reqCtx1.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + reply = &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 2")) + + // the second request should succeed + reply = &vpe.ControlPingReply{} + err = reqCtx3.ReceiveReply(reply) + Expect(err).To(BeNil()) + Expect(reply.Retval).To(BeEquivalentTo(3)) +} + +func TestMultiRequestsWithErrors(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // replies for a previous timeouted requests to be ignored + msgs := []mock.MsgWithContext{} + msgs = append(msgs, + mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{Retval: 1}, + SeqNum: 0xffff - 1, + }, + mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{Retval: 1}, + SeqNum: 0xffff, + }, + mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{Retval: 1}, + SeqNum: 0, + }) + + for i := 0; i < 10; i++ { + msgs = append(msgs, + mock.MsgWithContext{ + Msg: &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, + SeqNum: 1, + Multipart: true, + }) + } + // missing finalizing control ping + + // reply for a next request + msgs = append(msgs, + mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{Retval: 2}, + SeqNum: 2, + Multipart: false, + }) + + // queue replies + ctx.mockVpp.MockReplyWithContext(msgs...) + + // send multipart request + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + + for i := 0; i < 10; i++ { + // receive multi-part replies + reply := &interfaces.SwInterfaceDetails{} + lastReplyReceived, err := reqCtx.ReceiveReply(reply) + + Expect(lastReplyReceived).To(BeFalse()) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(i)) + } + + // missing closing control ping + reply := &interfaces.SwInterfaceDetails{} + _, err := reqCtx.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + // try again - still fails and nothing consumed + _, err = reqCtx.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + // reply for the second request has not been consumed + reqCtx2 := ctx.ch.SendRequest(&vpe.ControlPing{}) + reply2 := &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply2) + Expect(err).To(BeNil()) + Expect(reply2.Retval).To(BeEquivalentTo(2)) +} + +func TestRequestsOrdering(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // the orderings of SendRequest and ReceiveReply calls should match, otherwise + // some replies will get thrown away + + // first request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1}) + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + // second request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}) + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // if reply for the second request is read first, the reply for the first + // request gets thrown away. + reply2 := &vpe.ControlPingReply{} + err := reqCtx2.ReceiveReply(reply2) + Expect(err).To(BeNil()) + Expect(reply2.Retval).To(BeEquivalentTo(2)) + + // first request has already been considered closed + reply1 := &vpe.ControlPingReply{} + err = reqCtx1.ReceiveReply(reply1) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) +} + +func TestCycleOverSetOfSequenceNumbers(t *testing.T) { + ctx := setupTest(t, true) + defer ctx.teardownTest() + + numIters := 0xffff + 100 + reqCtx := make(map[int]api.RequestCtx) + + for i := 0; i < numIters+30; /* receiver is 30 reqs behind */ i++ { + if i < numIters { + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) + req := &vpe.ControlPing{} + reqCtx[i] = ctx.ch.SendRequest(req) + } + if i > 30 { + reply := &vpe.ControlPingReply{} + err := reqCtx[i-30].ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(i - 30)) + } + } +} diff --git a/core/core.go b/core/core.go deleted file mode 100644 index 052eb0b..0000000 --- a/core/core.go +++ /dev/null @@ -1,400 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api - -package core - -import ( - "errors" - "os" - "sync" - "sync/atomic" - "time" - - logger "github.com/sirupsen/logrus" - - "git.fd.io/govpp.git/adapter" - "git.fd.io/govpp.git/api" - "git.fd.io/govpp.git/core/bin_api/vpe" -) - -var ( - msgControlPing api.Message = &vpe.ControlPing{} - msgControlPingReply api.Message = &vpe.ControlPingReply{} -) - -const ( - requestChannelBufSize = 100 // default size of the request channel buffers - replyChannelBufSize = 100 // default size of the reply channel buffers - notificationChannelBufSize = 100 // default size of the notification channel buffers -) - -var ( - healthCheckProbeInterval = time.Second * 1 // default health check probe interval - healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe - healthCheckThreshold = 1 // number of failed healthProbe until the error is reported -) - -// ConnectionState holds the current state of the connection to VPP. -type ConnectionState int - -const ( - // Connected connection state means that the connection to VPP has been successfully established. - Connected ConnectionState = iota - - // Disconnected connection state means that the connection to VPP has been lost. - Disconnected -) - -// ConnectionEvent is a notification about change in the VPP connection state. -type ConnectionEvent struct { - // Timestamp holds the time when the event has been generated. - Timestamp time.Time - - // State holds the new state of the connection to VPP at the time when the event has been generated. - State ConnectionState -} - -// Connection represents a shared memory connection to VPP via vppAdapter. -type Connection struct { - vpp adapter.VppAdapter // VPP adapter - connected uint32 // non-zero if the adapter is connected to VPP - codec *MsgCodec // message codec - - msgIDsLock sync.RWMutex // lock for the message IDs map - msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC - - channelsLock sync.RWMutex // lock for the channels map - channels map[uint16]*api.Channel // map of all API channels indexed by the channel ID - - notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map - notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID - - maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) - pingReqID uint16 // ID if the ControlPing message - pingReplyID uint16 // ID of the ControlPingReply message - - lastReplyLock sync.Mutex // lock for the last reply - lastReply time.Time // time of the last received reply from VPP -} - -var ( - log *logger.Logger // global logger - conn *Connection // global handle to the Connection (used in the message receive callback) - connLock sync.RWMutex // lock for the global connection -) - -// init initializes global logger, which logs debug level messages to stdout. -func init() { - log = logger.New() - log.Out = os.Stdout - log.Level = logger.DebugLevel -} - -// SetLogger sets global logger to provided one. -func SetLogger(l *logger.Logger) { - log = l -} - -// SetHealthCheckProbeInterval sets health check probe interval. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckProbeInterval(interval time.Duration) { - healthCheckProbeInterval = interval -} - -// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. -// If reply arrives after the timeout, check is considered as failed. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckReplyTimeout(timeout time.Duration) { - healthCheckReplyTimeout = timeout -} - -// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckThreshold(threshold int) { - healthCheckThreshold = threshold -} - -// SetControlPingMessages sets the messages for ControlPing and ControlPingReply -func SetControlPingMessages(controPing, controlPingReply api.Message) { - msgControlPing = controPing - msgControlPingReply = controlPingReply -} - -// Connect connects to VPP using specified VPP adapter and returns the connection handle. -// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. -func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { - // create new connection handle - c, err := newConnection(vppAdapter) - if err != nil { - return nil, err - } - - // blocking attempt to connect to VPP - err = c.connectVPP() - if err != nil { - return nil, err - } - - return conn, nil -} - -// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle -// and ConnectionState channel. This call does not block until connection is established, it -// returns immediately. The caller is supposed to watch the returned ConnectionState channel for -// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. -func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { - // create new connection handle - c, err := newConnection(vppAdapter) - if err != nil { - return nil, nil, err - } - - // asynchronously attempt to connect to VPP - connChan := make(chan ConnectionEvent, notificationChannelBufSize) - go c.connectLoop(connChan) - - return conn, connChan, nil -} - -// Disconnect disconnects from VPP and releases all connection-related resources. -func (c *Connection) Disconnect() { - if c == nil { - return - } - connLock.Lock() - defer connLock.Unlock() - - if c != nil && c.vpp != nil { - c.disconnectVPP() - } - conn = nil -} - -// newConnection returns new connection handle. -func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { - connLock.Lock() - defer connLock.Unlock() - - if conn != nil { - return nil, errors.New("only one connection per process is supported") - } - - conn = &Connection{ - vpp: vppAdapter, - codec: &MsgCodec{}, - channels: make(map[uint16]*api.Channel), - msgIDs: make(map[string]uint16), - notifSubscriptions: make(map[uint16][]*api.NotifSubscription), - } - - conn.vpp.SetMsgCallback(msgCallback) - return conn, nil -} - -// connectVPP performs one blocking attempt to connect to VPP. -func (c *Connection) connectVPP() error { - log.Debug("Connecting to VPP...") - - // blocking connect - err := c.vpp.Connect() - if err != nil { - log.Warn(err) - return err - } - - // store control ping IDs - if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil { - c.vpp.Disconnect() - return err - } - if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil { - c.vpp.Disconnect() - return err - } - - // store connected state - atomic.StoreUint32(&c.connected, 1) - - log.Info("Connected to VPP.") - return nil -} - -// disconnectVPP disconnects from VPP in case it is connected. -func (c *Connection) disconnectVPP() { - if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { - c.vpp.Disconnect() - } -} - -// connectLoop attempts to connect to VPP until it succeeds. -// Then it continues with healthCheckLoop. -func (c *Connection) connectLoop(connChan chan ConnectionEvent) { - // loop until connected - for { - if err := c.vpp.WaitReady(); err != nil { - log.Warnf("wait ready failed: %v", err) - } - if err := c.connectVPP(); err == nil { - // signal connected event - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} - break - } else { - log.Errorf("connecting to VPP failed: %v", err) - time.Sleep(time.Second) - } - } - - // we are now connected, continue with health check loop - c.healthCheckLoop(connChan) -} - -// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, -// it continues with connectLoop and tries to reconnect. -func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { - // create a separate API channel for health check probes - ch, err := conn.NewAPIChannelBuffered(1, 1) - if err != nil { - log.Error("Failed to create health check API channel, health check will be disabled:", err) - return - } - - var sinceLastReply time.Duration - var failedChecks int - - // send health check probes until an error or timeout occurs - for { - // sleep until next health check probe period - time.Sleep(healthCheckProbeInterval) - - if atomic.LoadUint32(&c.connected) == 0 { - // Disconnect has been called in the meantime, return the healthcheck - reconnect loop - log.Debug("Disconnected on request, exiting health check loop.") - return - } - - // try draining probe replies from previous request before sending next one - select { - case <-ch.ReplyChan: - log.Debug("drained old probe reply from reply channel") - default: - } - - // send the control ping request - ch.ReqChan <- &api.VppRequest{Message: msgControlPing} - - for { - // expect response within timeout period - select { - case vppReply := <-ch.ReplyChan: - err = vppReply.Error - - case <-time.After(healthCheckReplyTimeout): - err = ErrProbeTimeout - - // check if time since last reply from any other - // channel is less than health check reply timeout - conn.lastReplyLock.Lock() - sinceLastReply = time.Since(c.lastReply) - conn.lastReplyLock.Unlock() - - if sinceLastReply < healthCheckReplyTimeout { - log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) - continue - } - } - break - } - - if err == ErrProbeTimeout { - failedChecks++ - log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks) - if failedChecks > healthCheckThreshold { - // in case of exceeded treshold disconnect - log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} - break - } - } else if err != nil { - // in case of error disconnect - log.Errorf("VPP health check probe failed: %v", err) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} - break - } else if failedChecks > 0 { - failedChecks = 0 - log.Infof("VPP health check probe OK") - } - } - - // cleanup - ch.Close() - c.disconnectVPP() - - // we are now disconnected, start connect loop - c.connectLoop(connChan) -} - -// NewAPIChannel returns a new API channel for communication with VPP via govpp core. -// It uses default buffer sizes for the request and reply Go channels. -func (c *Connection) NewAPIChannel() (*api.Channel, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize) -} - -// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. -// It allows to specify custom buffer sizes for the request and reply Go channels. -func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - ch := api.NewChannelInternal(chID) - ch.MsgDecoder = c.codec - ch.MsgIdentifier = c - - // create the communication channels - ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize) - ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize) - ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize) - ch.NotifSubsReplyChan = make(chan error, replyChanBufSize) - - // store API channel within the client - c.channelsLock.Lock() - c.channels[chID] = ch - c.channelsLock.Unlock() - - // start watching on the request channel - go c.watchRequests(ch) - - return ch, nil -} - -// releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *api.Channel) { - log.WithFields(logger.Fields{ - "ID": ch.ID, - }).Debug("API channel closed.") - - // delete the channel from channels map - c.channelsLock.Lock() - delete(c.channels, ch.ID) - c.channelsLock.Unlock() -} diff --git a/core/core_test.go b/core/core_test.go deleted file mode 100644 index e4fbf63..0000000 --- a/core/core_test.go +++ /dev/null @@ -1,542 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core_test - -import ( - "testing" - - "git.fd.io/govpp.git/adapter/mock" - "git.fd.io/govpp.git/api" - "git.fd.io/govpp.git/core" - "git.fd.io/govpp.git/core/bin_api/vpe" - "git.fd.io/govpp.git/examples/bin_api/interfaces" - "git.fd.io/govpp.git/examples/bin_api/stats" - - . "github.com/onsi/gomega" -) - -type testCtx struct { - mockVpp *mock.VppAdapter - conn *core.Connection - ch *api.Channel -} - -func setupTest(t *testing.T, bufferedChan bool) *testCtx { - RegisterTestingT(t) - - ctx := &testCtx{} - ctx.mockVpp = &mock.VppAdapter{} - - var err error - ctx.conn, err = core.Connect(ctx.mockVpp) - Expect(err).ShouldNot(HaveOccurred()) - - if bufferedChan { - ctx.ch, err = ctx.conn.NewAPIChannelBuffered(100, 100) - } else { - ctx.ch, err = ctx.conn.NewAPIChannel() - } - Expect(err).ShouldNot(HaveOccurred()) - - return ctx -} - -func (ctx *testCtx) teardownTest() { - ctx.ch.Close() - ctx.conn.Disconnect() -} - -func TestSimpleRequest(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}) - - req := &vpe.ControlPing{} - reply := &vpe.ControlPingReply{} - - // send the request and receive a reply - ctx.ch.ReqChan <- &api.VppRequest{Message: req} - vppReply := <-ctx.ch.ReplyChan - - Expect(vppReply).ShouldNot(BeNil()) - Expect(vppReply.Error).ShouldNot(HaveOccurred()) - - // decode the message - err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) - Expect(err).ShouldNot(HaveOccurred()) - - Expect(reply.Retval).To(BeEquivalentTo(-5)) -} - -func TestMultiRequest(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - msgs := []api.Message{} - for m := 0; m < 10; m++ { - msgs = append(msgs, &interfaces.SwInterfaceDetails{}) - } - ctx.mockVpp.MockReply(msgs...) - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - - // send multipart request - ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} - - cnt := 0 - for { - // receive a reply - vppReply := <-ctx.ch.ReplyChan - if vppReply.LastReplyReceived { - break // break out of the loop - } - Expect(vppReply.Error).ShouldNot(HaveOccurred()) - - // decode the message - reply := &interfaces.SwInterfaceDetails{} - err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) - Expect(err).ShouldNot(HaveOccurred()) - cnt++ - } - - Expect(cnt).To(BeEquivalentTo(10)) -} - -func TestNotifications(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - // subscribe for notification - notifChan := make(chan api.Message, 1) - subscription := &api.NotifSubscription{ - NotifChan: notifChan, - MsgFactory: interfaces.NewSwInterfaceSetFlags, - } - ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: true, - } - err := <-ctx.ch.NotifSubsReplyChan - Expect(err).ShouldNot(HaveOccurred()) - - // mock the notification and force its delivery - ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ - SwIfIndex: 3, - AdminUpDown: 1, - }) - ctx.mockVpp.SendMsg(0, []byte{0}) - - // receive the notification - notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) - - Expect(notif.SwIfIndex).To(BeEquivalentTo(3)) - - // unsubscribe notification - ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: false, - } - err = <-ctx.ch.NotifSubsReplyChan - Expect(err).ShouldNot(HaveOccurred()) -} - -func TestNilConnection(t *testing.T) { - RegisterTestingT(t) - var conn *core.Connection - - ch, err := conn.NewAPIChannel() - Expect(ch).Should(BeNil()) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("nil")) - - ch, err = conn.NewAPIChannelBuffered(1, 1) - Expect(ch).Should(BeNil()) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("nil")) -} - -func TestDoubleConnection(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - conn, err := core.Connect(ctx.mockVpp) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("only one connection per process")) - Expect(conn).Should(BeNil()) -} - -func TestAsyncConnection(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - ctx.conn.Disconnect() - conn, ch, err := core.AsyncConnect(ctx.mockVpp) - ctx.conn = conn - - Expect(err).ShouldNot(HaveOccurred()) - Expect(conn).ShouldNot(BeNil()) - - ev := <-ch - Expect(ev.State).Should(BeEquivalentTo(core.Connected)) -} - -func TestFullBuffer(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - // close the default API channel - ctx.ch.Close() - - // create a new channel with limited buffer sizes - var err error - ctx.ch, err = ctx.conn.NewAPIChannelBuffered(10, 1) - Expect(err).ShouldNot(HaveOccurred()) - - // send multiple requests, only one reply should be read - for i := 0; i < 20; i++ { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} - } - - vppReply := <-ctx.ch.ReplyChan - Expect(vppReply).ShouldNot(BeNil()) - - var received bool - select { - case <-ctx.ch.ReplyChan: - received = true // this should not happen - default: - received = false // no reply to be received - } - Expect(received).Should(BeFalse(), "A reply has been recieved, should had been ignored.") -} - -func TestCodec(t *testing.T) { - RegisterTestingT(t) - - codec := &core.MsgCodec{} - - // request - data, err := codec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11) - Expect(err).ShouldNot(HaveOccurred()) - Expect(data).ShouldNot(BeEmpty()) - - msg1 := &interfaces.CreateLoopback{} - err = codec.DecodeMsg(data, msg1) - Expect(err).ShouldNot(HaveOccurred()) - Expect(msg1.MacAddress).To(BeEquivalentTo([]byte{1, 2, 3, 4, 5, 6})) - - // reply - data, err = codec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22) - Expect(err).ShouldNot(HaveOccurred()) - Expect(data).ShouldNot(BeEmpty()) - - msg2 := &vpe.ControlPingReply{} - err = codec.DecodeMsg(data, msg2) - Expect(err).ShouldNot(HaveOccurred()) - Expect(msg2.Retval).To(BeEquivalentTo(55)) - - // other - data, err = codec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33) - Expect(err).ShouldNot(HaveOccurred()) - Expect(data).ShouldNot(BeEmpty()) - - msg3 := &stats.VnetIP4FibCounters{} - err = codec.DecodeMsg(data, msg3) - Expect(err).ShouldNot(HaveOccurred()) - Expect(msg3.VrfID).To(BeEquivalentTo(77)) -} - -func TestCodecNegative(t *testing.T) { - RegisterTestingT(t) - - codec := &core.MsgCodec{} - - // nil message for encoding - data, err := codec.EncodeMsg(nil, 15) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("nil message")) - Expect(data).Should(BeNil()) - - // nil message for decoding - err = codec.DecodeMsg(data, nil) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("nil message")) - - // nil data for decoding - err = codec.DecodeMsg(nil, &vpe.ControlPingReply{}) - Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("EOF")) -} - -func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - var reqCtx []*api.RequestCtx - for i := 0; i < 10; i++ { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) - req := &vpe.ControlPing{} - reqCtx = append(reqCtx, ctx.ch.SendRequest(req)) - } - - for i := 0; i < 10; i++ { - reply := &vpe.ControlPingReply{} - err := reqCtx[i].ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(i)) - } -} - -func TestMultiRequestsWithSequenceNumbers(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - msgs := []api.Message{} - for i := 0; i < 10; i++ { - msgs = append(msgs, &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}) - } - ctx.mockVpp.MockReply(msgs...) - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - - // send multipart request - reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) - - cnt := 0 - for { - Expect(cnt < 11).To(BeTrue()) - - // receive a reply - reply := &interfaces.SwInterfaceDetails{} - lastReplyReceived, err := reqCtx.ReceiveReply(reply) - - if lastReplyReceived { - break // break out of the loop - } - - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.SwIfIndex).To(BeEquivalentTo(cnt)) - - cnt++ - } - - Expect(cnt).To(BeEquivalentTo(10)) -} - -func TestSimpleRequestWithTimeout(t *testing.T) { - ctx := setupTest(t, true) - defer ctx.teardownTest() - - // reply for a previous timeouted requests to be ignored - ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0, - }) - - // send reply later - req1 := &vpe.ControlPing{} - reqCtx1 := ctx.ch.SendRequest(req1) - - reply := &vpe.ControlPingReply{} - err := reqCtx1.ReceiveReply(reply) - Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) - - ctx.mockVpp.MockReplyWithContext( - // reply for the previous request - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 1, - }, - // reply for the next request - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 2}, - SeqNum: 2, - }) - - // next request - req2 := &vpe.ControlPing{} - reqCtx2 := ctx.ch.SendRequest(req2) - - // second request should ignore the first reply and return the second one - reply = &vpe.ControlPingReply{} - err = reqCtx2.ReceiveReply(reply) - Expect(err).To(BeNil()) - Expect(reply.Retval).To(BeEquivalentTo(2)) -} - -func TestSimpleRequestsWithMissingReply(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - // request without reply - req1 := &vpe.ControlPing{} - reqCtx1 := ctx.ch.SendRequest(req1) - - // another request without reply - req2 := &vpe.ControlPing{} - reqCtx2 := ctx.ch.SendRequest(req2) - - // third request with reply - ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 3}, - SeqNum: 3, - }) - req3 := &vpe.ControlPing{} - reqCtx3 := ctx.ch.SendRequest(req3) - - // the first two should fail, but not consume reply for the 3rd - reply := &vpe.ControlPingReply{} - err := reqCtx1.ReceiveReply(reply) - Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) - - reply = &vpe.ControlPingReply{} - err = reqCtx2.ReceiveReply(reply) - Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 2")) - - // the second request should succeed - reply = &vpe.ControlPingReply{} - err = reqCtx3.ReceiveReply(reply) - Expect(err).To(BeNil()) - Expect(reply.Retval).To(BeEquivalentTo(3)) -} - -func TestMultiRequestsWithErrors(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - // replies for a previous timeouted requests to be ignored - msgs := []mock.MsgWithContext{} - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0xffff - 1, - }, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0xffff, - }, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0, - }) - - for i := 0; i < 10; i++ { - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, - SeqNum: 1, - Multipart: true, - }) - } - // missing finalizing control ping - - // reply for a next request - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 2}, - SeqNum: 2, - Multipart: false, - }) - - // queue replies - ctx.mockVpp.MockReplyWithContext(msgs...) - - // send multipart request - reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) - - for i := 0; i < 10; i++ { - // receive multi-part replies - reply := &interfaces.SwInterfaceDetails{} - lastReplyReceived, err := reqCtx.ReceiveReply(reply) - - Expect(lastReplyReceived).To(BeFalse()) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.SwIfIndex).To(BeEquivalentTo(i)) - } - - // missing closing control ping - reply := &interfaces.SwInterfaceDetails{} - _, err := reqCtx.ReceiveReply(reply) - Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) - - // try again - still fails and nothing consumed - _, err = reqCtx.ReceiveReply(reply) - Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) - - // reply for the second request has not been consumed - reqCtx2 := ctx.ch.SendRequest(&vpe.ControlPing{}) - reply2 := &vpe.ControlPingReply{} - err = reqCtx2.ReceiveReply(reply2) - Expect(err).To(BeNil()) - Expect(reply2.Retval).To(BeEquivalentTo(2)) -} - -func TestRequestsOrdering(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - // the orderings of SendRequest and ReceiveReply calls should match, otherwise - // some replies will get thrown away - - // first request - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1}) - req1 := &vpe.ControlPing{} - reqCtx1 := ctx.ch.SendRequest(req1) - - // second request - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}) - req2 := &vpe.ControlPing{} - reqCtx2 := ctx.ch.SendRequest(req2) - - // if reply for the second request is read first, the reply for the first - // request gets thrown away. - reply2 := &vpe.ControlPingReply{} - err := reqCtx2.ReceiveReply(reply2) - Expect(err).To(BeNil()) - Expect(reply2.Retval).To(BeEquivalentTo(2)) - - // first request has already been considered closed - reply1 := &vpe.ControlPingReply{} - err = reqCtx1.ReceiveReply(reply1) - Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) -} - -func TestCycleOverSetOfSequenceNumbers(t *testing.T) { - ctx := setupTest(t, true) - defer ctx.teardownTest() - - numIters := 0xffff + 100 - reqCtx := make(map[int]*api.RequestCtx) - - for i := 0; i < numIters+30; /* receiver is 30 reqs behind */ i++ { - if i < numIters { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) - req := &vpe.ControlPing{} - reqCtx[i] = ctx.ch.SendRequest(req) - } - if i > 30 { - reply := &vpe.ControlPingReply{} - err := reqCtx[i-30].ReceiveReply(reply) - Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(i - 30)) - } - } -} diff --git a/core/doc.go b/core/doc.go index a4ecd50..5b0b40e 100644 --- a/core/doc.go +++ b/core/doc.go @@ -17,4 +17,95 @@ // defer ch.Close() // // Note that one application can open only one connection, that can serve multiple API channels. +// +// The API offers two ways of communication with govpp core: using Go channels, or using convenient function +// wrappers over the Go channels. The latter should be sufficient for most of the use cases. +// +// The entry point to the API is the Channel structure, that can be obtained from the existing connection using +// the NewAPIChannel or NewAPIChannelBuffered functions: +// +// conn, err := govpp.Connect() +// if err != nil { +// // handle error! +// } +// defer conn.Disconnect() +// +// ch, err := conn.NewAPIChannel() +// if err != nil { +// // handle error! +// } +// defer ch.Close() +// +// +// Simple Request-Reply API +// +// The simple version of the API is based on blocking SendRequest / ReceiveReply calls, where a single request +// message is sent to VPP and a single reply message is filled in when the reply comes from VPP: +// +// req := &acl.ACLPluginGetVersion{} +// reply := &acl.ACLPluginGetVersionReply{} +// +// err := ch.SendRequest(req).ReceiveReply(reply) +// // process the reply +// +// Note that if the reply message type that comes from VPP does not match with provided one, you'll get an error. +// +// +// Multipart Reply API +// +// If multiple messages are expected as a reply to a request, SendMultiRequest API must be used: +// +// req := &interfaces.SwInterfaceDump{} +// reqCtx := ch.SendMultiRequest(req) +// +// for { +// reply := &interfaces.SwInterfaceDetails{} +// stop, err := reqCtx.ReceiveReply(reply) +// if stop { +// break // break out of the loop +// } +// // process the reply +// } +// +// Note that if the last reply has been already consumed, stop boolean return value is set to true. +// Do not use the message itself if stop is true - it won't be filled with actual data. +// +// +// Go Channels API +// +// The blocking API introduced above may be not sufficient for some management applications that strongly +// rely on usage of Go channels. In this case, the API allows to access the underlying Go channels directly, e.g. +// the following replacement of the SendRequest / ReceiveReply API: +// +// req := &acl.ACLPluginGetVersion{} +// // send the request to the request go channel +// ch.GetRequestChannel <- &api.VppRequest{Message: req} +// +// // receive a reply from the reply go channel +// vppReply := <-ch.GetReplyChannel +// +// // decode the message +// reply := &acl.ACLPluginGetVersionReply{} +// err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) +// +// // process the reply +// +// +// Notifications API +// +// to subscribe for receiving of the specified notification messages via provided Go channel, use the +// SubscribeNotification API: +// +// // subscribe for specific notification message +// notifChan := make(chan api.Message, 100) +// subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) +// +// // receive one notification +// notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) +// +// ch.UnsubscribeNotification(subs) +// +// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's +// buffer is full, the notifications will not be delivered into it. +// package core diff --git a/core/msg_codec.go b/core/msg_codec.go deleted file mode 100644 index e32916b..0000000 --- a/core/msg_codec.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "bytes" - "errors" - "fmt" - "reflect" - - "github.com/lunixbochs/struc" - logger "github.com/sirupsen/logrus" - - "git.fd.io/govpp.git/api" -) - -// MsgCodec provides encoding and decoding functionality of `api.Message` structs into/from -// binary format as accepted by VPP. -type MsgCodec struct{} - -// VppRequestHeader struct contains header fields implemented by all VPP requests. -type VppRequestHeader struct { - VlMsgID uint16 - ClientIndex uint32 - Context uint32 -} - -// VppReplyHeader struct contains header fields implemented by all VPP replies. -type VppReplyHeader struct { - VlMsgID uint16 - Context uint32 -} - -// VppEventHeader struct contains header fields implemented by all VPP events. -type VppEventHeader struct { - VlMsgID uint16 - Context uint32 -} - -// VppOtherHeader struct contains header fields implemented by other VPP messages (not requests nor replies). -type VppOtherHeader struct { - VlMsgID uint16 -} - -const ( - vppRequestHeaderSize = 10 // size of a VPP request header - vppReplyHeaderSize = 6 // size of a VPP reply header - vppEventHeaderSize = 6 // size of a VPP event header - vppOtherHeaderSize = 2 // size of the header of other VPP messages -) - -// EncodeMsg encodes provided `Message` structure into its binary-encoded data representation. -func (*MsgCodec) EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) { - if msg == nil { - return nil, errors.New("nil message passed in") - } - - buf := new(bytes.Buffer) - - // encode message header - var header interface{} - if msg.GetMessageType() == api.RequestMessage { - header = &VppRequestHeader{VlMsgID: msgID} - } else if msg.GetMessageType() == api.ReplyMessage { - header = &VppReplyHeader{VlMsgID: msgID} - } else if msg.GetMessageType() == api.EventMessage { - header = &VppEventHeader{VlMsgID: msgID} - } else { - header = &VppOtherHeader{VlMsgID: msgID} - } - err := struc.Pack(buf, header) - if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "header": header, - }).Error("Unable to encode the message header: ", err) - return nil, fmt.Errorf("unable to encode the message header: %v", err) - } - - // encode message content - if reflect.Indirect(reflect.ValueOf(msg)).NumField() > 0 { - err := struc.Pack(buf, msg) - if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "message": msg, - }).Error("Unable to encode the message: ", err) - return nil, fmt.Errorf("unable to encode the message: %v", err) - } - } - - return buf.Bytes(), nil -} - -// DecodeMsg decodes binary-encoded data of a message into provided `Message` structure. -func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error { - if msg == nil { - return errors.New("nil message passed in") - } - - buf := bytes.NewReader(data) - - // check which header is expected - var header interface{} - if msg.GetMessageType() == api.RequestMessage { - header = &VppRequestHeader{} - } else if msg.GetMessageType() == api.ReplyMessage { - header = &VppReplyHeader{} - } else if msg.GetMessageType() == api.EventMessage { - header = &VppEventHeader{} - } else { - header = &VppOtherHeader{} - } - - // decode message header - err := struc.Unpack(buf, header) - if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "data": data, - }).Error("Unable to decode header of the message.") - return fmt.Errorf("unable to decode the message header: %v", err) - } - - // get rid of the message header - if msg.GetMessageType() == api.RequestMessage { - buf = bytes.NewReader(data[vppRequestHeaderSize:]) - } else if msg.GetMessageType() == api.ReplyMessage { - buf = bytes.NewReader(data[vppReplyHeaderSize:]) - } else if msg.GetMessageType() == api.EventMessage { - buf = bytes.NewReader(data[vppEventHeaderSize:]) - } else { - buf = bytes.NewReader(data[vppOtherHeaderSize:]) - } - - // decode message content - err = struc.Unpack(buf, msg) - if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "data": buf, - }).Error("Unable to decode the message.") - return fmt.Errorf("unable to decode the message: %v", err) - } - - return nil -} diff --git a/core/notification_handler.go b/core/notification_handler.go index 89c16a4..c0e8687 100644 --- a/core/notification_handler.go +++ b/core/notification_handler.go @@ -18,13 +18,12 @@ import ( "fmt" "reflect" - logger "github.com/sirupsen/logrus" - "git.fd.io/govpp.git/api" + logger "github.com/sirupsen/logrus" ) // processNotifSubscribeRequest processes a notification subscribe request. -func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error { +func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSubscribeRequest) error { var err error // subscribe / unsubscribe @@ -36,7 +35,7 @@ func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.Noti // send the reply into the go channel select { - case ch.NotifSubsReplyChan <- err: + case ch.notifSubsReplyChan <- err: // reply sent successfully default: // unable to write into the channel without blocking diff --git a/core/request_handler.go b/core/request_handler.go index 3bec38d..8681963 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -31,10 +31,10 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel) { +func (c *Connection) watchRequests(ch *channel) { for { select { - case req, ok := <-ch.ReqChan: + case req, ok := <-ch.reqChan: // new request on the request channel if !ok { // after closing the request channel, release API channel and return @@ -43,7 +43,7 @@ func (c *Connection) watchRequests(ch *api.Channel) { } c.processRequest(ch, req) - case req := <-ch.NotifSubsChan: + case req := <-ch.notifSubsChan: // new request on the notification subscribe channel c.processNotifSubscribeRequest(ch, req) } @@ -51,7 +51,7 @@ func (c *Connection) watchRequests(ch *api.Channel) { } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error { +func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected @@ -78,7 +78,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error if err != nil { err = fmt.Errorf("unable to encode the messge: %v", err) log.WithFields(logger.Fields{ - "channel": ch.ID, + "channel": ch.id, "msg_id": msgID, "seq_num": req.SeqNum, }).Error(err) @@ -88,7 +88,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "channel": ch.ID, + "channel": ch.id, "msg_id": msgID, "msg_size": len(data), "msg_name": req.Message.GetMessageName(), @@ -97,7 +97,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error } // send the request to VPP - context := packRequestContext(ch.ID, req.Multipart, req.SeqNum) + context := packRequestContext(ch.id, req.Multipart, req.SeqNum) err = c.vpp.SendMsg(context, data) if err != nil { err = fmt.Errorf("unable to send the message: %v", err) @@ -189,9 +189,9 @@ func msgCallback(context uint32, msgID uint16, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. -func sendReply(ch *api.Channel, reply *api.VppReply) { +func sendReply(ch *channel, reply *api.VppReply) { select { - case ch.ReplyChan <- reply: + case ch.replyChan <- reply: // reply sent successfully case <-time.After(time.Millisecond * 100): // receiver still not ready diff --git a/examples/cmd/perf-bench/perf-bench.go b/examples/cmd/perf-bench/perf-bench.go index f3ff752..b1f4dcf 100644 --- a/examples/cmd/perf-bench/perf-bench.go +++ b/examples/cmd/perf-bench/perf-bench.go @@ -95,7 +95,7 @@ func main() { fmt.Printf("Requests per second: %.0f\n", float64(cnt)/elapsed.Seconds()) } -func syncTest(ch *api.Channel, cnt int) { +func syncTest(ch api.Channel, cnt int) { fmt.Printf("Running synchronous perf test with %d requests...\n", cnt) for i := 0; i < cnt; i++ { @@ -110,7 +110,7 @@ func syncTest(ch *api.Channel, cnt int) { } } -func asyncTest(ch *api.Channel, cnt int) { +func asyncTest(ch api.Channel, cnt int) { fmt.Printf("Running asynchronous perf test with %d requests...\n", cnt) // start a new go routine that reads the replies @@ -125,20 +125,20 @@ func asyncTest(ch *api.Channel, cnt int) { wg.Wait() } -func sendAsyncRequests(ch *api.Channel, cnt int) { +func sendAsyncRequests(ch api.Channel, cnt int) { for i := 0; i < cnt; i++ { - ch.ReqChan <- &api.VppRequest{ + ch.GetRequestChannel() <- &api.VppRequest{ Message: &vpe.ControlPing{}, } } } -func readAsyncReplies(ch *api.Channel, expectedCnt int, wg *sync.WaitGroup) { +func readAsyncReplies(ch api.Channel, expectedCnt int, wg *sync.WaitGroup) { cnt := 0 for { // receive a reply - reply := <-ch.ReplyChan + reply := <-ch.GetReplyChannel() if reply.Error != nil { log.Println("Error in reply:", reply.Error) os.Exit(1) @@ -146,7 +146,7 @@ func readAsyncReplies(ch *api.Channel, expectedCnt int, wg *sync.WaitGroup) { // decode the message msg := &vpe.ControlPingReply{} - err := ch.MsgDecoder.DecodeMsg(reply.Data, msg) + err := ch.GetMessageDecoder().DecodeMsg(reply.Data, msg) if reply.Error != nil { log.Println("Error by decoding:", err) os.Exit(1) @@ -159,4 +159,4 @@ func readAsyncReplies(ch *api.Channel, expectedCnt int, wg *sync.WaitGroup) { return } } -} \ No newline at end of file +} diff --git a/examples/cmd/simple-client/simple_client.go b/examples/cmd/simple-client/simple_client.go index 67dc14b..7b7dbcd 100644 --- a/examples/cmd/simple-client/simple_client.go +++ b/examples/cmd/simple-client/simple_client.go @@ -66,7 +66,7 @@ func main() { // compatibilityCheck shows how an management application can check whether generated API messages are // compatible with the version of VPP which the library is connected to. -func compatibilityCheck(ch *api.Channel) { +func compatibilityCheck(ch api.Channel) { err := ch.CheckMessageCompatibility( &interfaces.SwInterfaceDump{}, &interfaces.SwInterfaceDetails{}, @@ -78,7 +78,7 @@ func compatibilityCheck(ch *api.Channel) { } // aclVersion is the simplest API example - one empty request message and one reply message. -func aclVersion(ch *api.Channel) { +func aclVersion(ch api.Channel) { req := &acl.ACLPluginGetVersion{} reply := &acl.ACLPluginGetVersionReply{} @@ -92,7 +92,7 @@ func aclVersion(ch *api.Channel) { } // aclConfig is another simple API example - in this case, the request contains structured data. -func aclConfig(ch *api.Channel) { +func aclConfig(ch api.Channel) { req := &acl.ACLAddReplace{ ACLIndex: ^uint32(0), Tag: []byte("access list 1"), @@ -127,7 +127,7 @@ func aclConfig(ch *api.Channel) { } // aclDump shows an example where SendRequest and ReceiveReply are not chained together. -func aclDump(ch *api.Channel) { +func aclDump(ch api.Channel) { req := &acl.ACLDump{} reply := &acl.ACLDetails{} @@ -143,17 +143,17 @@ func aclDump(ch *api.Channel) { // tapConnect example shows how the Go channels in the API channel can be accessed directly instead // of using SendRequest and ReceiveReply wrappers. -func tapConnect(ch *api.Channel) { +func tapConnect(ch api.Channel) { req := &tap.TapConnect{ TapName: []byte("testtap"), UseRandomMac: 1, } // send the request to the request go channel - ch.ReqChan <- &api.VppRequest{Message: req} + ch.GetRequestChannel() <- &api.VppRequest{Message: req} // receive a reply from the reply go channel - vppReply := <-ch.ReplyChan + vppReply := <-ch.GetReplyChannel() if vppReply.Error != nil { fmt.Println("Error:", vppReply.Error) return @@ -161,7 +161,7 @@ func tapConnect(ch *api.Channel) { // decode the message reply := &tap.TapConnectReply{} - err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) + err := ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) if err != nil { fmt.Println("Error:", err) @@ -171,7 +171,7 @@ func tapConnect(ch *api.Channel) { } // interfaceDump shows an example of multipart request (multiple replies are expected). -func interfaceDump(ch *api.Channel) { +func interfaceDump(ch api.Channel) { req := &interfaces.SwInterfaceDump{} reqCtx := ch.SendMultiRequest(req) @@ -191,7 +191,7 @@ func interfaceDump(ch *api.Channel) { // interfaceNotifications shows the usage of notification API. Note that for notifications, // you are supposed to create your own Go channel with your preferred buffer size. If the channel's // buffer is full, the notifications will not be delivered into it. -func interfaceNotifications(ch *api.Channel) { +func interfaceNotifications(ch api.Channel) { // subscribe for specific notification message notifChan := make(chan api.Message, 100) subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) @@ -218,4 +218,4 @@ func interfaceNotifications(ch *api.Channel) { // unsubscribe from delivery of the notifications ch.UnsubscribeNotification(subs) -} \ No newline at end of file +} diff --git a/examples/cmd/stats-client/stats_client.go b/examples/cmd/stats-client/stats_client.go index 17c7956..5f9966f 100644 --- a/examples/cmd/stats-client/stats_client.go +++ b/examples/cmd/stats-client/stats_client.go @@ -101,7 +101,7 @@ loop: } // subscribeNotifications subscribes for interface counters notifications. -func subscribeNotifications(ch *api.Channel) (*api.NotifSubscription, *api.NotifSubscription, chan api.Message) { +func subscribeNotifications(ch api.Channel) (*api.NotifSubscription, *api.NotifSubscription, chan api.Message) { notifChan := make(chan api.Message, 100) simpleCountersSubs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceSimpleCounters) @@ -111,7 +111,7 @@ func subscribeNotifications(ch *api.Channel) (*api.NotifSubscription, *api.Notif } // requestStatistics requests interface counters notifications from VPP. -func requestStatistics(ch *api.Channel) { +func requestStatistics(ch api.Channel) { ch.SendRequest(&stats.WantStats{ Pid: uint32(os.Getpid()), EnableDisable: 1, @@ -141,4 +141,4 @@ func processCombinedCounters(counters *interfaces.VnetInterfaceCombinedCounters) counters.FirstSwIfIndex+i, counterNames[counters.VnetCounterType], counters.Data[i].Packets, counterNames[counters.VnetCounterType], counters.Data[i].Bytes) } -} \ No newline at end of file +} -- cgit 1.2.3-korg