diff options
author | Vladimir Lavor <vlavor@cisco.com> | 2018-07-03 10:39:21 +0200 |
---|---|---|
committer | Vladimir Lavor <vlavor@cisco.com> | 2018-07-06 13:18:01 +0200 |
commit | f1bef4a3c66f4408afdeb64cda62ccd8562d0fc6 (patch) | |
tree | 5767c18051f97362a00b1a4dfe90ec9480247032 /core | |
parent | 5276b9439d0f902e125a5113bfa4f1b6622aea18 (diff) |
make api.Channel as interface
Change-Id: I052d241ab09043b1195beebeee99df4d8536621f
Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
Diffstat (limited to 'core')
-rw-r--r-- | core/channel.go | 276 | ||||
-rw-r--r-- | core/channel_test.go | 587 | ||||
-rw-r--r-- | core/connection.go (renamed from core/core.go) | 57 | ||||
-rw-r--r-- | core/connection_test.go (renamed from core/core_test.go) | 55 | ||||
-rw-r--r-- | core/doc.go | 91 | ||||
-rw-r--r-- | core/msg_codec.go | 159 | ||||
-rw-r--r-- | core/notification_handler.go | 7 | ||||
-rw-r--r-- | core/request_handler.go | 18 |
8 files changed, 1024 insertions, 226 deletions
diff --git a/core/channel.go b/core/channel.go new file mode 100644 index 0000000..87b3e29 --- /dev/null +++ b/core/channel.go @@ -0,0 +1,276 @@ +// Copyright (c) 2018 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "fmt" + "time" + + "errors" + + "git.fd.io/govpp.git/api" + "github.com/sirupsen/logrus" +) + +const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout + +// requestCtxData is a context of a ongoing request (simple one - only one response is expected). +type requestCtxData struct { + ch *channel + seqNum uint16 +} + +// multiRequestCtxData is a context of a ongoing multipart request (multiple responses are expected). +type multiRequestCtxData struct { + ch *channel + seqNum uint16 +} + +func (req *requestCtxData) ReceiveReply(msg api.Message) error { + if req == nil || req.ch == nil { + return errors.New("invalid request context") + } + + lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) + + if lastReplyReceived { + err = errors.New("multipart reply recieved while a simple reply expected") + } + return err +} + +func (req *multiRequestCtxData) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { + if req == nil || req.ch == nil { + return false, errors.New("invalid request context") + } + + return req.ch.receiveReplyInternal(msg, req.seqNum) +} + +// channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests +// to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels +// via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines +// concurrently, otherwise the responses could mix! Use multiple channels instead. +type channel struct { + id uint16 // channel ID + + reqChan chan *api.VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider + replyChan chan *api.VppReply // channel where VPP replies are delivered to + + notifSubsChan chan *api.NotifSubscribeRequest // channel for sending notification subscribe requests + notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to + + msgDecoder api.MessageDecoder // used to decode binary data to generated API messages + msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message + + lastSeqNum uint16 // sequence number of the last sent request + + delayedReply *api.VppReply // reply already taken from ReplyChan, buffered for later delivery + replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout +} + +func (ch *channel) SendRequest(msg api.Message) api.RequestCtx { + ch.lastSeqNum++ + ch.reqChan <- &api.VppRequest{ + Message: msg, + SeqNum: ch.lastSeqNum, + } + return &requestCtxData{ch: ch, seqNum: ch.lastSeqNum} +} + +func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { + ch.lastSeqNum++ + ch.reqChan <- &api.VppRequest{ + Message: msg, + Multipart: true, + SeqNum: ch.lastSeqNum, + } + return &multiRequestCtxData{ch: ch, seqNum: ch.lastSeqNum} +} + +func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) { + subscription := &api.NotifSubscription{ + NotifChan: notifChan, + MsgFactory: msgFactory, + } + ch.notifSubsChan <- &api.NotifSubscribeRequest{ + Subscription: subscription, + Subscribe: true, + } + return subscription, <-ch.notifSubsReplyChan +} + +func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error { + ch.notifSubsChan <- &api.NotifSubscribeRequest{ + Subscription: subscription, + Subscribe: false, + } + return <-ch.notifSubsReplyChan +} + +func (ch *channel) CheckMessageCompatibility(messages ...api.Message) error { + for _, msg := range messages { + _, err := ch.msgIdentifier.GetMessageID(msg) + if err != nil { + return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + } + } + return nil +} + +func (ch *channel) SetReplyTimeout(timeout time.Duration) { + ch.replyTimeout = timeout +} + +func (ch *channel) GetRequestChannel() chan<- *api.VppRequest { + return ch.reqChan +} + +func (ch *channel) GetReplyChannel() <-chan *api.VppReply { + return ch.replyChan +} + +func (ch *channel) GetNotificationChannel() chan<- *api.NotifSubscribeRequest { + return ch.notifSubsChan +} + +func (ch *channel) GetNotificationReplyChannel() <-chan error { + return ch.notifSubsReplyChan +} + +func (ch *channel) GetMessageDecoder() api.MessageDecoder { + return ch.msgDecoder +} + +func (ch *channel) GetID() uint16 { + return ch.id +} + +func (ch *channel) Close() { + if ch.reqChan != nil { + close(ch.reqChan) + } +} + +// receiveReplyInternal receives a reply from the reply channel into the provided msg structure. +func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) { + var ignore bool + if msg == nil { + return false, errors.New("nil message passed in") + } + + if ch.delayedReply != nil { + // try the delayed reply + vppReply := ch.delayedReply + ch.delayedReply = nil + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if !ignore { + return lastReplyReceived, err + } + } + + timer := time.NewTimer(ch.replyTimeout) + for { + select { + // blocks until a reply comes to ReplyChan or until timeout expires + case vppReply := <-ch.replyChan: + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if ignore { + continue + } + return lastReplyReceived, err + + case <-timer.C: + err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) + return false, err + } + } + return +} + +func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { + // check the sequence number + cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) + if cmpSeqNums == -1 { + // reply received too late, ignore the message + logrus.WithField("sequence-number", reply.SeqNum).Warn( + "Received reply to an already closed binary API request") + ignore = true + return + } + if cmpSeqNums == 1 { + ch.delayedReply = reply + err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum) + return + } + + if reply.Error != nil { + err = reply.Error + return + } + if reply.LastReplyReceived { + lastReplyReceived = true + return + } + + // message checks + var expMsgID uint16 + expMsgID, err = ch.msgIdentifier.GetMessageID(msg) + if err != nil { + err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + return + } + + if reply.MessageID != expMsgID { + var msgNameCrc string + if nameCrc, err := ch.msgIdentifier.LookupByID(reply.MessageID); err != nil { + msgNameCrc = err.Error() + } else { + msgNameCrc = nameCrc + } + + err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+ + "(check if multiple goroutines are not sharing single GoVPP channel)", + reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) + return + } + + // decode the message + err = ch.msgDecoder.DecodeMsg(reply.Data, msg) + return +} + +// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to, +// or succeeds seq. number <seqNum2>. +// Since sequence numbers cycle in the finite set of size 2^16, the function +// must assume that the distance between compared sequence numbers is less than +// (2^16)/2 to determine the order. +func compareSeqNumbers(seqNum1, seqNum2 uint16) int { + // calculate distance from seqNum1 to seqNum2 + var dist uint16 + if seqNum1 <= seqNum2 { + dist = seqNum2 - seqNum1 + } else { + dist = 0xffff - (seqNum1 - seqNum2 - 1) + } + if dist == 0 { + return 0 + } else if dist <= 0x8000 { + return -1 + } + return 1 +} diff --git a/core/channel_test.go b/core/channel_test.go new file mode 100644 index 0000000..d573f29 --- /dev/null +++ b/core/channel_test.go @@ -0,0 +1,587 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "testing" + "time" + + "git.fd.io/govpp.git/adapter/mock" + "git.fd.io/govpp.git/core/bin_api/vpe" + "git.fd.io/govpp.git/examples/bin_api/interfaces" + "git.fd.io/govpp.git/examples/bin_api/memif" + "git.fd.io/govpp.git/examples/bin_api/tap" + + "git.fd.io/govpp.git/api" + . "github.com/onsi/gomega" +) + +type testCtx struct { + mockVpp *mock.VppAdapter + conn *Connection + ch api.Channel +} + +func setupTest(t *testing.T) *testCtx { + RegisterTestingT(t) + + ctx := &testCtx{ + mockVpp: &mock.VppAdapter{}, + } + + var err error + ctx.conn, err = Connect(ctx.mockVpp) + Expect(err).ShouldNot(HaveOccurred()) + + ctx.ch, err = ctx.conn.NewAPIChannel() + Expect(err).ShouldNot(HaveOccurred()) + + return ctx +} + +func (ctx *testCtx) teardownTest() { + ctx.ch.Close() + ctx.conn.Disconnect() +} + +func TestRequestReplyTapConnect(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&tap.TapConnectReply{ + Retval: 10, + SwIfIndex: 1, + }) + request := &tap.TapConnect{ + TapName: []byte("test-tap-name"), + UseRandomMac: 1, + } + reply := &tap.TapConnectReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(10), "Incorrect retval value for TapConnectReply") + Expect(reply.SwIfIndex).To(BeEquivalentTo(1), "Incorrect SwIfIndex value for TapConnectReply") +} + +func TestRequestReplyTapModify(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&tap.TapModifyReply{ + Retval: 15, + SwIfIndex: 2, + }) + request := &tap.TapModify{ + TapName: []byte("test-tap-modify"), + UseRandomMac: 1, + CustomDevInstance: 1, + } + reply := &tap.TapModifyReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(15), "Incorrect retval value for TapModifyReply") + Expect(reply.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for TapModifyReply") +} + +func TestRequestReplyTapDelete(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&tap.TapDeleteReply{ + Retval: 20, + }) + request := &tap.TapDelete{ + SwIfIndex: 3, + } + reply := &tap.TapDeleteReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(20), "Incorrect retval value for TapDeleteReply") +} + +func TestRequestReplySwInterfaceTapDump(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + byteName := []byte("dev-name-test") + ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{ + SwIfIndex: 25, + DevName: byteName, + }) + request := &tap.SwInterfaceTapDump{} + reply := &tap.SwInterfaceTapDetails{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for SwInterfaceTapDetails") + Expect(reply.DevName).ToNot(BeNil(), "Incorrect DevName value for SwInterfaceTapDetails") +} + +func TestRequestReplyMemifCreate(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&memif.MemifCreateReply{ + Retval: 22, + SwIfIndex: 4, + }) + request := &memif.MemifCreate{ + Role: 10, + ID: 12, + RingSize: 8000, + BufferSize: 50, + } + reply := &memif.MemifCreateReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(22), "Incorrect Retval value for MemifCreate") + Expect(reply.SwIfIndex).To(BeEquivalentTo(4), "Incorrect SwIfIndex value for MemifCreate") +} + +func TestRequestReplyMemifDelete(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&memif.MemifDeleteReply{ + Retval: 24, + }) + request := &memif.MemifDelete{ + SwIfIndex: 15, + } + reply := &memif.MemifDeleteReply{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(24), "Incorrect Retval value for MemifDelete") +} + +func TestRequestReplyMemifDetails(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.mockVpp.MockReply(&memif.MemifDetails{ + SwIfIndex: 25, + IfName: []byte("memif-name"), + Role: 0, + }) + request := &memif.MemifDump{} + reply := &memif.MemifDetails{} + + err := ctx.ch.SendRequest(request).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for MemifDetails") + Expect(reply.IfName).ToNot(BeEmpty(), "MemifDetails IfName is empty byte array") + Expect(reply.Role).To(BeEquivalentTo(0), "Incorrect Role value for MemifDetails") +} + +func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // mock reply + var msgs []api.Message + for i := 1; i <= 10; i++ { + msgs = append(msgs, &tap.SwInterfaceTapDetails{ + SwIfIndex: uint32(i), + DevName: []byte("dev-name-test"), + }) + } + ctx.mockVpp.MockReply(msgs...) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + reqCtx := ctx.ch.SendMultiRequest(&tap.SwInterfaceTapDump{}) + cnt := 0 + for { + msg := &tap.SwInterfaceTapDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + Expect(err).ShouldNot(HaveOccurred()) + cnt++ + } + Expect(cnt).To(BeEquivalentTo(10)) +} + +func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // mock reply + var msgs []api.Message + for i := 1; i <= 10; i++ { + msgs = append(msgs, &memif.MemifDetails{ + SwIfIndex: uint32(i), + }) + } + ctx.mockVpp.MockReply(msgs...) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + reqCtx := ctx.ch.SendMultiRequest(&memif.MemifDump{}) + cnt := 0 + for { + msg := &memif.MemifDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + Expect(err).ShouldNot(HaveOccurred()) + cnt++ + } + Expect(cnt).To(BeEquivalentTo(10)) +} + +func TestNotifications(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // subscribe for notification + notifChan := make(chan api.Message, 1) + subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) + Expect(err).ShouldNot(HaveOccurred()) + + // mock the notification and force its delivery + ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ + SwIfIndex: 3, + AdminUpDown: 1, + }) + ctx.mockVpp.SendMsg(0, []byte("")) + + // receive the notification + var notif *interfaces.SwInterfaceSetFlags + Eventually(func() *interfaces.SwInterfaceSetFlags { + select { + case n := <-notifChan: + notif = n.(*interfaces.SwInterfaceSetFlags) + return notif + default: + return nil + } + }).ShouldNot(BeNil()) + + // verify the received notifications + Expect(notif.SwIfIndex).To(BeEquivalentTo(3), "Incorrect SwIfIndex value for SwInterfaceSetFlags") + Expect(notif.AdminUpDown).To(BeEquivalentTo(1), "Incorrect AdminUpDown value for SwInterfaceSetFlags") + + ctx.ch.UnsubscribeNotification(subs) +} + +func TestNotificationEvent(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // subscribe for notification + notifChan := make(chan api.Message, 1) + subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent) + Expect(err).ShouldNot(HaveOccurred()) + + // mock the notification and force its delivery + ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{ + SwIfIndex: 2, + LinkUpDown: 1, + }) + ctx.mockVpp.SendMsg(0, []byte("")) + + // receive the notification + var notif *interfaces.SwInterfaceEvent + Eventually(func() *interfaces.SwInterfaceEvent { + select { + case n := <-notifChan: + notif = n.(*interfaces.SwInterfaceEvent) + return notif + default: + return nil + } + }).ShouldNot(BeNil()) + + // verify the received notifications + Expect(notif.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for SwInterfaceSetFlags") + Expect(notif.LinkUpDown).To(BeEquivalentTo(1), "Incorrect LinkUpDown value for SwInterfaceSetFlags") + + ctx.ch.UnsubscribeNotification(subs) +} + +func TestCheckMessageCompatibility(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + err := ctx.ch.CheckMessageCompatibility(&interfaces.SwInterfaceSetFlags{}) + Expect(err).ShouldNot(HaveOccurred()) +} +func TestSetReplyTimeout(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond) + + // first one request should work + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + // no other reply ready - expect timeout + err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) +} + +func TestSetReplyTimeoutMultiRequest(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond) + + var msgs []api.Message + for i := 1; i <= 3; i++ { + msgs = append(msgs, &interfaces.SwInterfaceDetails{ + SwIfIndex: uint32(i), + InterfaceName: []byte("if-name-test"), + }) + } + ctx.mockVpp.MockReply(msgs...) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + + cnt := 0 + sendMultiRequest := func() error { + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + for { + msg := &interfaces.SwInterfaceDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + if err != nil { + return err + } + cnt++ + } + return nil + } + + // first one request should work + err := sendMultiRequest() + Expect(err).ShouldNot(HaveOccurred()) + + // no other reply ready - expect timeout + err = sendMultiRequest() + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) + + Expect(cnt).To(BeEquivalentTo(3)) +} + +func TestReceiveReplyNegative(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // invalid context 1 + reqCtx1 := &requestCtxData{} + err := reqCtx1.ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid request context")) + + // invalid context 2 + reqCtx2 := &multiRequestCtxData{} + _, err = reqCtx2.ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid request context")) + + // NU + reqCtx3 := &requestCtxData{} + err = reqCtx3.ReceiveReply(nil) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid request context")) +} + +func TestMultiRequestDouble(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // mock reply + var msgs []mock.MsgWithContext + for i := 1; i <= 3; i++ { + msgs = append(msgs, mock.MsgWithContext{ + Msg: &interfaces.SwInterfaceDetails{ + SwIfIndex: uint32(i), + InterfaceName: []byte("if-name-test"), + }, + Multipart: true, + SeqNum: 1, + }) + } + msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 1}) + + for i := 1; i <= 3; i++ { + msgs = append(msgs, + mock.MsgWithContext{ + Msg: &interfaces.SwInterfaceDetails{ + SwIfIndex: uint32(i), + InterfaceName: []byte("if-name-test"), + }, + Multipart: true, + SeqNum: 2, + }) + } + msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 2}) + + ctx.mockVpp.MockReplyWithContext(msgs...) + + cnt := 0 + var sendMultiRequest = func() error { + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + for { + msg := &interfaces.SwInterfaceDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + if err != nil { + return err + } + cnt++ + } + return nil + } + + err := sendMultiRequest() + Expect(err).ShouldNot(HaveOccurred()) + + err = sendMultiRequest() + Expect(err).ShouldNot(HaveOccurred()) + + Expect(cnt).To(BeEquivalentTo(6)) +} + +func TestReceiveReplyAfterTimeout(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond) + + // first one request should work + ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 1}) + err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) + + ctx.mockVpp.MockReplyWithContext( + // simulating late reply + mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 2}, + // normal reply for next request + mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3}) + + req := &tap.TapConnect{ + TapName: []byte("test-tap-name"), + UseRandomMac: 1, + } + reply := &tap.TapConnectReply{} + + // should succeed + err = ctx.ch.SendRequest(req).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) +} + +func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { + /* + TODO: fix mock adapter + This test will fail because mock adapter will stop sending replies + when it encounters control_ping_reply from multi request, + thus never sending reply for next request + */ + t.Skip() + + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.ch.SetReplyTimeout(time.Millisecond * 100) + + // first one request should work + ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 1}) + err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + cnt := 0 + var sendMultiRequest = func() error { + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + for { + msg := &interfaces.SwInterfaceDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + if err != nil { + return err + } + cnt++ + } + return nil + } + + err = sendMultiRequest() + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("timeout")) + Expect(cnt).To(BeEquivalentTo(0)) + + // simulating late replies + var msgs []mock.MsgWithContext + for i := 1; i <= 3; i++ { + msgs = append(msgs, mock.MsgWithContext{ + Msg: &interfaces.SwInterfaceDetails{ + SwIfIndex: uint32(i), + InterfaceName: []byte("if-name-test"), + }, + Multipart: true, + SeqNum: 2, + }) + } + msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 2}) + ctx.mockVpp.MockReplyWithContext(msgs...) + + // normal reply for next request + ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3}) + + req := &tap.TapConnect{ + TapName: []byte("test-tap-name"), + UseRandomMac: 1, + } + reply := &tap.TapConnectReply{} + + // should succeed + err = ctx.ch.SendRequest(req).ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) +} + +func TestInvalidMessageID(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + // first one request should work + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{}) + Expect(err).ShouldNot(HaveOccurred()) + + // second should fail with error invalid message ID + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid message ID")) +} diff --git a/core/core.go b/core/connection.go index 052eb0b..a44d0c4 100644 --- a/core/core.go +++ b/core/connection.go @@ -27,6 +27,7 @@ import ( "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/codec" "git.fd.io/govpp.git/core/bin_api/vpe" ) @@ -71,13 +72,13 @@ type ConnectionEvent struct { type Connection struct { vpp adapter.VppAdapter // VPP adapter connected uint32 // non-zero if the adapter is connected to VPP - codec *MsgCodec // message codec + codec *codec.MsgCodec // message codec msgIDsLock sync.RWMutex // lock for the message IDs map msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC - channelsLock sync.RWMutex // lock for the channels map - channels map[uint16]*api.Channel // map of all API channels indexed by the channel ID + channelsLock sync.RWMutex // lock for the channels map + channels map[uint16]*channel // map of all API channels indexed by the channel ID notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID @@ -197,8 +198,8 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { conn = &Connection{ vpp: vppAdapter, - codec: &MsgCodec{}, - channels: make(map[uint16]*api.Channel), + codec: &codec.MsgCodec{}, + channels: make(map[uint16]*channel), msgIDs: make(map[string]uint16), notifSubscriptions: make(map[uint16][]*api.NotifSubscription), } @@ -268,7 +269,7 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) { // it continues with connectLoop and tries to reconnect. func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { // create a separate API channel for health check probes - ch, err := conn.NewAPIChannelBuffered(1, 1) + ch, err := conn.newAPIChannelBuffered(1, 1) if err != nil { log.Error("Failed to create health check API channel, health check will be disabled:", err) return @@ -290,18 +291,18 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { // try draining probe replies from previous request before sending next one select { - case <-ch.ReplyChan: + case <-ch.replyChan: log.Debug("drained old probe reply from reply channel") default: } // send the control ping request - ch.ReqChan <- &api.VppRequest{Message: msgControlPing} + ch.reqChan <- &api.VppRequest{Message: msgControlPing} for { // expect response within timeout period select { - case vppReply := <-ch.ReplyChan: + case vppReply := <-ch.replyChan: err = vppReply.Error case <-time.After(healthCheckReplyTimeout): @@ -349,32 +350,34 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { c.connectLoop(connChan) } -// NewAPIChannel returns a new API channel for communication with VPP via govpp core. -// It uses default buffer sizes for the request and reply Go channels. -func (c *Connection) NewAPIChannel() (*api.Channel, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize) +func (c *Connection) NewAPIChannel() (api.Channel, error) { + return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize) +} + +func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { + return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize) } // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. // It allows to specify custom buffer sizes for the request and reply Go channels. -func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) { +func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) { if c == nil { return nil, errors.New("nil connection passed in") } chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - ch := api.NewChannelInternal(chID) - ch.MsgDecoder = c.codec - ch.MsgIdentifier = c + ch := &channel{ + id: chID, + replyTimeout: defaultReplyTimeout, + } + ch.msgDecoder = c.codec + ch.msgIdentifier = c // create the communication channels - ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize) - ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize) - ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize) - ch.NotifSubsReplyChan = make(chan error, replyChanBufSize) + ch.reqChan = make(chan *api.VppRequest, reqChanBufSize) + ch.replyChan = make(chan *api.VppReply, replyChanBufSize) + ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize) + ch.notifSubsReplyChan = make(chan error, replyChanBufSize) // store API channel within the client c.channelsLock.Lock() @@ -388,13 +391,13 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) } // releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *api.Channel) { +func (c *Connection) releaseAPIChannel(ch *channel) { log.WithFields(logger.Fields{ - "ID": ch.ID, + "ID": ch.id, }).Debug("API channel closed.") // delete the channel from channels map c.channelsLock.Lock() - delete(c.channels, ch.ID) + delete(c.channels, ch.id) c.channelsLock.Unlock() } diff --git a/core/core_test.go b/core/connection_test.go index e4fbf63..b7c3aa0 100644 --- a/core/core_test.go +++ b/core/connection_test.go @@ -24,13 +24,14 @@ import ( "git.fd.io/govpp.git/examples/bin_api/interfaces" "git.fd.io/govpp.git/examples/bin_api/stats" + "git.fd.io/govpp.git/codec" . "github.com/onsi/gomega" ) type testCtx struct { mockVpp *mock.VppAdapter conn *core.Connection - ch *api.Channel + ch api.Channel } func setupTest(t *testing.T, bufferedChan bool) *testCtx { @@ -68,14 +69,14 @@ func TestSimpleRequest(t *testing.T) { reply := &vpe.ControlPingReply{} // send the request and receive a reply - ctx.ch.ReqChan <- &api.VppRequest{Message: req} - vppReply := <-ctx.ch.ReplyChan + ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: req} + vppReply := <-ctx.ch.GetReplyChannel() Expect(vppReply).ShouldNot(BeNil()) Expect(vppReply.Error).ShouldNot(HaveOccurred()) // decode the message - err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) + err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) Expect(err).ShouldNot(HaveOccurred()) Expect(reply.Retval).To(BeEquivalentTo(-5)) @@ -93,12 +94,12 @@ func TestMultiRequest(t *testing.T) { ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) // send multipart request - ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} + ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} cnt := 0 for { // receive a reply - vppReply := <-ctx.ch.ReplyChan + vppReply := <-ctx.ch.GetReplyChannel() if vppReply.LastReplyReceived { break // break out of the loop } @@ -106,7 +107,7 @@ func TestMultiRequest(t *testing.T) { // decode the message reply := &interfaces.SwInterfaceDetails{} - err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) + err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) Expect(err).ShouldNot(HaveOccurred()) cnt++ } @@ -124,11 +125,11 @@ func TestNotifications(t *testing.T) { NotifChan: notifChan, MsgFactory: interfaces.NewSwInterfaceSetFlags, } - ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{ + ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ Subscription: subscription, Subscribe: true, } - err := <-ctx.ch.NotifSubsReplyChan + err := <-ctx.ch.GetNotificationReplyChannel() Expect(err).ShouldNot(HaveOccurred()) // mock the notification and force its delivery @@ -144,11 +145,11 @@ func TestNotifications(t *testing.T) { Expect(notif.SwIfIndex).To(BeEquivalentTo(3)) // unsubscribe notification - ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{ + ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ Subscription: subscription, Subscribe: false, } - err = <-ctx.ch.NotifSubsReplyChan + err = <-ctx.ch.GetNotificationReplyChannel() Expect(err).ShouldNot(HaveOccurred()) } @@ -207,15 +208,15 @@ func TestFullBuffer(t *testing.T) { // send multiple requests, only one reply should be read for i := 0; i < 20; i++ { ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} + ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &vpe.ControlPing{}} } - vppReply := <-ctx.ch.ReplyChan + vppReply := <-ctx.ch.GetReplyChannel() Expect(vppReply).ShouldNot(BeNil()) var received bool select { - case <-ctx.ch.ReplyChan: + case <-ctx.ch.GetReplyChannel(): received = true // this should not happen default: received = false // no reply to be received @@ -226,35 +227,35 @@ func TestFullBuffer(t *testing.T) { func TestCodec(t *testing.T) { RegisterTestingT(t) - codec := &core.MsgCodec{} + msgCodec := &codec.MsgCodec{} // request - data, err := codec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11) + data, err := msgCodec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11) Expect(err).ShouldNot(HaveOccurred()) Expect(data).ShouldNot(BeEmpty()) msg1 := &interfaces.CreateLoopback{} - err = codec.DecodeMsg(data, msg1) + err = msgCodec.DecodeMsg(data, msg1) Expect(err).ShouldNot(HaveOccurred()) Expect(msg1.MacAddress).To(BeEquivalentTo([]byte{1, 2, 3, 4, 5, 6})) // reply - data, err = codec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22) + data, err = msgCodec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22) Expect(err).ShouldNot(HaveOccurred()) Expect(data).ShouldNot(BeEmpty()) msg2 := &vpe.ControlPingReply{} - err = codec.DecodeMsg(data, msg2) + err = msgCodec.DecodeMsg(data, msg2) Expect(err).ShouldNot(HaveOccurred()) Expect(msg2.Retval).To(BeEquivalentTo(55)) // other - data, err = codec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33) + data, err = msgCodec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33) Expect(err).ShouldNot(HaveOccurred()) Expect(data).ShouldNot(BeEmpty()) msg3 := &stats.VnetIP4FibCounters{} - err = codec.DecodeMsg(data, msg3) + err = msgCodec.DecodeMsg(data, msg3) Expect(err).ShouldNot(HaveOccurred()) Expect(msg3.VrfID).To(BeEquivalentTo(77)) } @@ -262,21 +263,21 @@ func TestCodec(t *testing.T) { func TestCodecNegative(t *testing.T) { RegisterTestingT(t) - codec := &core.MsgCodec{} + msgCodec := &codec.MsgCodec{} // nil message for encoding - data, err := codec.EncodeMsg(nil, 15) + data, err := msgCodec.EncodeMsg(nil, 15) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("nil message")) Expect(data).Should(BeNil()) // nil message for decoding - err = codec.DecodeMsg(data, nil) + err = msgCodec.DecodeMsg(data, nil) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("nil message")) // nil data for decoding - err = codec.DecodeMsg(nil, &vpe.ControlPingReply{}) + err = msgCodec.DecodeMsg(nil, &vpe.ControlPingReply{}) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("EOF")) } @@ -285,7 +286,7 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { ctx := setupTest(t, false) defer ctx.teardownTest() - var reqCtx []*api.RequestCtx + var reqCtx []api.RequestCtx for i := 0; i < 10; i++ { ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) req := &vpe.ControlPing{} @@ -524,7 +525,7 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) { defer ctx.teardownTest() numIters := 0xffff + 100 - reqCtx := make(map[int]*api.RequestCtx) + reqCtx := make(map[int]api.RequestCtx) for i := 0; i < numIters+30; /* receiver is 30 reqs behind */ i++ { if i < numIters { diff --git a/core/doc.go b/core/doc.go index a4ecd50..5b0b40e 100644 --- a/core/doc.go +++ b/core/doc.go @@ -17,4 +17,95 @@ // defer ch.Close() // // Note that one application can open only one connection, that can serve multiple API channels. +// +// The API offers two ways of communication with govpp core: using Go channels, or using convenient function +// wrappers over the Go channels. The latter should be sufficient for most of the use cases. +// +// The entry point to the API is the Channel structure, that can be obtained from the existing connection using +// the NewAPIChannel or NewAPIChannelBuffered functions: +// +// conn, err := govpp.Connect() +// if err != nil { +// // handle error! +// } +// defer conn.Disconnect() +// +// ch, err := conn.NewAPIChannel() +// if err != nil { +// // handle error! +// } +// defer ch.Close() +// +// +// Simple Request-Reply API +// +// The simple version of the API is based on blocking SendRequest / ReceiveReply calls, where a single request +// message is sent to VPP and a single reply message is filled in when the reply comes from VPP: +// +// req := &acl.ACLPluginGetVersion{} +// reply := &acl.ACLPluginGetVersionReply{} +// +// err := ch.SendRequest(req).ReceiveReply(reply) +// // process the reply +// +// Note that if the reply message type that comes from VPP does not match with provided one, you'll get an error. +// +// +// Multipart Reply API +// +// If multiple messages are expected as a reply to a request, SendMultiRequest API must be used: +// +// req := &interfaces.SwInterfaceDump{} +// reqCtx := ch.SendMultiRequest(req) +// +// for { +// reply := &interfaces.SwInterfaceDetails{} +// stop, err := reqCtx.ReceiveReply(reply) +// if stop { +// break // break out of the loop +// } +// // process the reply +// } +// +// Note that if the last reply has been already consumed, stop boolean return value is set to true. +// Do not use the message itself if stop is true - it won't be filled with actual data. +// +// +// Go Channels API +// +// The blocking API introduced above may be not sufficient for some management applications that strongly +// rely on usage of Go channels. In this case, the API allows to access the underlying Go channels directly, e.g. +// the following replacement of the SendRequest / ReceiveReply API: +// +// req := &acl.ACLPluginGetVersion{} +// // send the request to the request go channel +// ch.GetRequestChannel <- &api.VppRequest{Message: req} +// +// // receive a reply from the reply go channel +// vppReply := <-ch.GetReplyChannel +// +// // decode the message +// reply := &acl.ACLPluginGetVersionReply{} +// err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) +// +// // process the reply +// +// +// Notifications API +// +// to subscribe for receiving of the specified notification messages via provided Go channel, use the +// SubscribeNotification API: +// +// // subscribe for specific notification message +// notifChan := make(chan api.Message, 100) +// subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) +// +// // receive one notification +// notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) +// +// ch.UnsubscribeNotification(subs) +// +// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's +// buffer is full, the notifications will not be delivered into it. +// package core diff --git a/core/msg_codec.go b/core/msg_codec.go deleted file mode 100644 index e32916b..0000000 --- a/core/msg_codec.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "bytes" - "errors" - "fmt" - "reflect" - - "github.com/lunixbochs/struc" - logger "github.com/sirupsen/logrus" - - "git.fd.io/govpp.git/api" -) - -// MsgCodec provides encoding and decoding functionality of `api.Message` structs into/from -// binary format as accepted by VPP. -type MsgCodec struct{} - -// VppRequestHeader struct contains header fields implemented by all VPP requests. -type VppRequestHeader struct { - VlMsgID uint16 - ClientIndex uint32 - Context uint32 -} - -// VppReplyHeader struct contains header fields implemented by all VPP replies. -type VppReplyHeader struct { - VlMsgID uint16 - Context uint32 -} - -// VppEventHeader struct contains header fields implemented by all VPP events. -type VppEventHeader struct { - VlMsgID uint16 - Context uint32 -} - -// VppOtherHeader struct contains header fields implemented by other VPP messages (not requests nor replies). -type VppOtherHeader struct { - VlMsgID uint16 -} - -const ( - vppRequestHeaderSize = 10 // size of a VPP request header - vppReplyHeaderSize = 6 // size of a VPP reply header - vppEventHeaderSize = 6 // size of a VPP event header - vppOtherHeaderSize = 2 // size of the header of other VPP messages -) - -// EncodeMsg encodes provided `Message` structure into its binary-encoded data representation. -func (*MsgCodec) EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) { - if msg == nil { - return nil, errors.New("nil message passed in") - } - - buf := new(bytes.Buffer) - - // encode message header - var header interface{} - if msg.GetMessageType() == api.RequestMessage { - header = &VppRequestHeader{VlMsgID: msgID} - } else if msg.GetMessageType() == api.ReplyMessage { - header = &VppReplyHeader{VlMsgID: msgID} - } else if msg.GetMessageType() == api.EventMessage { - header = &VppEventHeader{VlMsgID: msgID} - } else { - header = &VppOtherHeader{VlMsgID: msgID} - } - err := struc.Pack(buf, header) - if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "header": header, - }).Error("Unable to encode the message header: ", err) - return nil, fmt.Errorf("unable to encode the message header: %v", err) - } - - // encode message content - if reflect.Indirect(reflect.ValueOf(msg)).NumField() > 0 { - err := struc.Pack(buf, msg) - if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "message": msg, - }).Error("Unable to encode the message: ", err) - return nil, fmt.Errorf("unable to encode the message: %v", err) - } - } - - return buf.Bytes(), nil -} - -// DecodeMsg decodes binary-encoded data of a message into provided `Message` structure. -func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error { - if msg == nil { - return errors.New("nil message passed in") - } - - buf := bytes.NewReader(data) - - // check which header is expected - var header interface{} - if msg.GetMessageType() == api.RequestMessage { - header = &VppRequestHeader{} - } else if msg.GetMessageType() == api.ReplyMessage { - header = &VppReplyHeader{} - } else if msg.GetMessageType() == api.EventMessage { - header = &VppEventHeader{} - } else { - header = &VppOtherHeader{} - } - - // decode message header - err := struc.Unpack(buf, header) - if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "data": data, - }).Error("Unable to decode header of the message.") - return fmt.Errorf("unable to decode the message header: %v", err) - } - - // get rid of the message header - if msg.GetMessageType() == api.RequestMessage { - buf = bytes.NewReader(data[vppRequestHeaderSize:]) - } else if msg.GetMessageType() == api.ReplyMessage { - buf = bytes.NewReader(data[vppReplyHeaderSize:]) - } else if msg.GetMessageType() == api.EventMessage { - buf = bytes.NewReader(data[vppEventHeaderSize:]) - } else { - buf = bytes.NewReader(data[vppOtherHeaderSize:]) - } - - // decode message content - err = struc.Unpack(buf, msg) - if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "data": buf, - }).Error("Unable to decode the message.") - return fmt.Errorf("unable to decode the message: %v", err) - } - - return nil -} diff --git a/core/notification_handler.go b/core/notification_handler.go index 89c16a4..c0e8687 100644 --- a/core/notification_handler.go +++ b/core/notification_handler.go @@ -18,13 +18,12 @@ import ( "fmt" "reflect" - logger "github.com/sirupsen/logrus" - "git.fd.io/govpp.git/api" + logger "github.com/sirupsen/logrus" ) // processNotifSubscribeRequest processes a notification subscribe request. -func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error { +func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSubscribeRequest) error { var err error // subscribe / unsubscribe @@ -36,7 +35,7 @@ func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.Noti // send the reply into the go channel select { - case ch.NotifSubsReplyChan <- err: + case ch.notifSubsReplyChan <- err: // reply sent successfully default: // unable to write into the channel without blocking diff --git a/core/request_handler.go b/core/request_handler.go index 3bec38d..8681963 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -31,10 +31,10 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel) { +func (c *Connection) watchRequests(ch *channel) { for { select { - case req, ok := <-ch.ReqChan: + case req, ok := <-ch.reqChan: // new request on the request channel if !ok { // after closing the request channel, release API channel and return @@ -43,7 +43,7 @@ func (c *Connection) watchRequests(ch *api.Channel) { } c.processRequest(ch, req) - case req := <-ch.NotifSubsChan: + case req := <-ch.notifSubsChan: // new request on the notification subscribe channel c.processNotifSubscribeRequest(ch, req) } @@ -51,7 +51,7 @@ func (c *Connection) watchRequests(ch *api.Channel) { } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error { +func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected @@ -78,7 +78,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error if err != nil { err = fmt.Errorf("unable to encode the messge: %v", err) log.WithFields(logger.Fields{ - "channel": ch.ID, + "channel": ch.id, "msg_id": msgID, "seq_num": req.SeqNum, }).Error(err) @@ -88,7 +88,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "channel": ch.ID, + "channel": ch.id, "msg_id": msgID, "msg_size": len(data), "msg_name": req.Message.GetMessageName(), @@ -97,7 +97,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error } // send the request to VPP - context := packRequestContext(ch.ID, req.Multipart, req.SeqNum) + context := packRequestContext(ch.id, req.Multipart, req.SeqNum) err = c.vpp.SendMsg(context, data) if err != nil { err = fmt.Errorf("unable to send the message: %v", err) @@ -189,9 +189,9 @@ func msgCallback(context uint32, msgID uint16, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. -func sendReply(ch *api.Channel, reply *api.VppReply) { +func sendReply(ch *channel, reply *api.VppReply) { select { - case ch.ReplyChan <- reply: + case ch.replyChan <- reply: // reply sent successfully case <-time.After(time.Millisecond * 100): // receiver still not ready |