aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/channel.go276
-rw-r--r--core/channel_test.go587
-rw-r--r--core/connection.go (renamed from core/core.go)57
-rw-r--r--core/connection_test.go (renamed from core/core_test.go)55
-rw-r--r--core/doc.go91
-rw-r--r--core/msg_codec.go159
-rw-r--r--core/notification_handler.go7
-rw-r--r--core/request_handler.go18
8 files changed, 1024 insertions, 226 deletions
diff --git a/core/channel.go b/core/channel.go
new file mode 100644
index 0000000..87b3e29
--- /dev/null
+++ b/core/channel.go
@@ -0,0 +1,276 @@
+// Copyright (c) 2018 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package core
+
+import (
+ "fmt"
+ "time"
+
+ "errors"
+
+ "git.fd.io/govpp.git/api"
+ "github.com/sirupsen/logrus"
+)
+
+const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
+
+// requestCtxData is a context of a ongoing request (simple one - only one response is expected).
+type requestCtxData struct {
+ ch *channel
+ seqNum uint16
+}
+
+// multiRequestCtxData is a context of a ongoing multipart request (multiple responses are expected).
+type multiRequestCtxData struct {
+ ch *channel
+ seqNum uint16
+}
+
+func (req *requestCtxData) ReceiveReply(msg api.Message) error {
+ if req == nil || req.ch == nil {
+ return errors.New("invalid request context")
+ }
+
+ lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
+
+ if lastReplyReceived {
+ err = errors.New("multipart reply recieved while a simple reply expected")
+ }
+ return err
+}
+
+func (req *multiRequestCtxData) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
+ if req == nil || req.ch == nil {
+ return false, errors.New("invalid request context")
+ }
+
+ return req.ch.receiveReplyInternal(msg, req.seqNum)
+}
+
+// channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
+// to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
+// via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
+// concurrently, otherwise the responses could mix! Use multiple channels instead.
+type channel struct {
+ id uint16 // channel ID
+
+ reqChan chan *api.VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
+ replyChan chan *api.VppReply // channel where VPP replies are delivered to
+
+ notifSubsChan chan *api.NotifSubscribeRequest // channel for sending notification subscribe requests
+ notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to
+
+ msgDecoder api.MessageDecoder // used to decode binary data to generated API messages
+ msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message
+
+ lastSeqNum uint16 // sequence number of the last sent request
+
+ delayedReply *api.VppReply // reply already taken from ReplyChan, buffered for later delivery
+ replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
+}
+
+func (ch *channel) SendRequest(msg api.Message) api.RequestCtx {
+ ch.lastSeqNum++
+ ch.reqChan <- &api.VppRequest{
+ Message: msg,
+ SeqNum: ch.lastSeqNum,
+ }
+ return &requestCtxData{ch: ch, seqNum: ch.lastSeqNum}
+}
+
+func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
+ ch.lastSeqNum++
+ ch.reqChan <- &api.VppRequest{
+ Message: msg,
+ Multipart: true,
+ SeqNum: ch.lastSeqNum,
+ }
+ return &multiRequestCtxData{ch: ch, seqNum: ch.lastSeqNum}
+}
+
+func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) {
+ subscription := &api.NotifSubscription{
+ NotifChan: notifChan,
+ MsgFactory: msgFactory,
+ }
+ ch.notifSubsChan <- &api.NotifSubscribeRequest{
+ Subscription: subscription,
+ Subscribe: true,
+ }
+ return subscription, <-ch.notifSubsReplyChan
+}
+
+func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error {
+ ch.notifSubsChan <- &api.NotifSubscribeRequest{
+ Subscription: subscription,
+ Subscribe: false,
+ }
+ return <-ch.notifSubsReplyChan
+}
+
+func (ch *channel) CheckMessageCompatibility(messages ...api.Message) error {
+ for _, msg := range messages {
+ _, err := ch.msgIdentifier.GetMessageID(msg)
+ if err != nil {
+ return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
+ msg.GetMessageName(), msg.GetCrcString())
+ }
+ }
+ return nil
+}
+
+func (ch *channel) SetReplyTimeout(timeout time.Duration) {
+ ch.replyTimeout = timeout
+}
+
+func (ch *channel) GetRequestChannel() chan<- *api.VppRequest {
+ return ch.reqChan
+}
+
+func (ch *channel) GetReplyChannel() <-chan *api.VppReply {
+ return ch.replyChan
+}
+
+func (ch *channel) GetNotificationChannel() chan<- *api.NotifSubscribeRequest {
+ return ch.notifSubsChan
+}
+
+func (ch *channel) GetNotificationReplyChannel() <-chan error {
+ return ch.notifSubsReplyChan
+}
+
+func (ch *channel) GetMessageDecoder() api.MessageDecoder {
+ return ch.msgDecoder
+}
+
+func (ch *channel) GetID() uint16 {
+ return ch.id
+}
+
+func (ch *channel) Close() {
+ if ch.reqChan != nil {
+ close(ch.reqChan)
+ }
+}
+
+// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
+func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
+ var ignore bool
+ if msg == nil {
+ return false, errors.New("nil message passed in")
+ }
+
+ if ch.delayedReply != nil {
+ // try the delayed reply
+ vppReply := ch.delayedReply
+ ch.delayedReply = nil
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if !ignore {
+ return lastReplyReceived, err
+ }
+ }
+
+ timer := time.NewTimer(ch.replyTimeout)
+ for {
+ select {
+ // blocks until a reply comes to ReplyChan or until timeout expires
+ case vppReply := <-ch.replyChan:
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if ignore {
+ continue
+ }
+ return lastReplyReceived, err
+
+ case <-timer.C:
+ err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
+ return false, err
+ }
+ }
+ return
+}
+
+func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
+ // check the sequence number
+ cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
+ if cmpSeqNums == -1 {
+ // reply received too late, ignore the message
+ logrus.WithField("sequence-number", reply.SeqNum).Warn(
+ "Received reply to an already closed binary API request")
+ ignore = true
+ return
+ }
+ if cmpSeqNums == 1 {
+ ch.delayedReply = reply
+ err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
+ return
+ }
+
+ if reply.Error != nil {
+ err = reply.Error
+ return
+ }
+ if reply.LastReplyReceived {
+ lastReplyReceived = true
+ return
+ }
+
+ // message checks
+ var expMsgID uint16
+ expMsgID, err = ch.msgIdentifier.GetMessageID(msg)
+ if err != nil {
+ err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
+ msg.GetMessageName(), msg.GetCrcString())
+ return
+ }
+
+ if reply.MessageID != expMsgID {
+ var msgNameCrc string
+ if nameCrc, err := ch.msgIdentifier.LookupByID(reply.MessageID); err != nil {
+ msgNameCrc = err.Error()
+ } else {
+ msgNameCrc = nameCrc
+ }
+
+ err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+
+ "(check if multiple goroutines are not sharing single GoVPP channel)",
+ reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
+ return
+ }
+
+ // decode the message
+ err = ch.msgDecoder.DecodeMsg(reply.Data, msg)
+ return
+}
+
+// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
+// or succeeds seq. number <seqNum2>.
+// Since sequence numbers cycle in the finite set of size 2^16, the function
+// must assume that the distance between compared sequence numbers is less than
+// (2^16)/2 to determine the order.
+func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
+ // calculate distance from seqNum1 to seqNum2
+ var dist uint16
+ if seqNum1 <= seqNum2 {
+ dist = seqNum2 - seqNum1
+ } else {
+ dist = 0xffff - (seqNum1 - seqNum2 - 1)
+ }
+ if dist == 0 {
+ return 0
+ } else if dist <= 0x8000 {
+ return -1
+ }
+ return 1
+}
diff --git a/core/channel_test.go b/core/channel_test.go
new file mode 100644
index 0000000..d573f29
--- /dev/null
+++ b/core/channel_test.go
@@ -0,0 +1,587 @@
+// Copyright (c) 2017 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package core
+
+import (
+ "testing"
+ "time"
+
+ "git.fd.io/govpp.git/adapter/mock"
+ "git.fd.io/govpp.git/core/bin_api/vpe"
+ "git.fd.io/govpp.git/examples/bin_api/interfaces"
+ "git.fd.io/govpp.git/examples/bin_api/memif"
+ "git.fd.io/govpp.git/examples/bin_api/tap"
+
+ "git.fd.io/govpp.git/api"
+ . "github.com/onsi/gomega"
+)
+
+type testCtx struct {
+ mockVpp *mock.VppAdapter
+ conn *Connection
+ ch api.Channel
+}
+
+func setupTest(t *testing.T) *testCtx {
+ RegisterTestingT(t)
+
+ ctx := &testCtx{
+ mockVpp: &mock.VppAdapter{},
+ }
+
+ var err error
+ ctx.conn, err = Connect(ctx.mockVpp)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ ctx.ch, err = ctx.conn.NewAPIChannel()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ return ctx
+}
+
+func (ctx *testCtx) teardownTest() {
+ ctx.ch.Close()
+ ctx.conn.Disconnect()
+}
+
+func TestRequestReplyTapConnect(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.mockVpp.MockReply(&tap.TapConnectReply{
+ Retval: 10,
+ SwIfIndex: 1,
+ })
+ request := &tap.TapConnect{
+ TapName: []byte("test-tap-name"),
+ UseRandomMac: 1,
+ }
+ reply := &tap.TapConnectReply{}
+
+ err := ctx.ch.SendRequest(request).ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.Retval).To(BeEquivalentTo(10), "Incorrect retval value for TapConnectReply")
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(1), "Incorrect SwIfIndex value for TapConnectReply")
+}
+
+func TestRequestReplyTapModify(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.mockVpp.MockReply(&tap.TapModifyReply{
+ Retval: 15,
+ SwIfIndex: 2,
+ })
+ request := &tap.TapModify{
+ TapName: []byte("test-tap-modify"),
+ UseRandomMac: 1,
+ CustomDevInstance: 1,
+ }
+ reply := &tap.TapModifyReply{}
+
+ err := ctx.ch.SendRequest(request).ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.Retval).To(BeEquivalentTo(15), "Incorrect retval value for TapModifyReply")
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for TapModifyReply")
+}
+
+func TestRequestReplyTapDelete(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.mockVpp.MockReply(&tap.TapDeleteReply{
+ Retval: 20,
+ })
+ request := &tap.TapDelete{
+ SwIfIndex: 3,
+ }
+ reply := &tap.TapDeleteReply{}
+
+ err := ctx.ch.SendRequest(request).ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.Retval).To(BeEquivalentTo(20), "Incorrect retval value for TapDeleteReply")
+}
+
+func TestRequestReplySwInterfaceTapDump(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ byteName := []byte("dev-name-test")
+ ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{
+ SwIfIndex: 25,
+ DevName: byteName,
+ })
+ request := &tap.SwInterfaceTapDump{}
+ reply := &tap.SwInterfaceTapDetails{}
+
+ err := ctx.ch.SendRequest(request).ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for SwInterfaceTapDetails")
+ Expect(reply.DevName).ToNot(BeNil(), "Incorrect DevName value for SwInterfaceTapDetails")
+}
+
+func TestRequestReplyMemifCreate(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.mockVpp.MockReply(&memif.MemifCreateReply{
+ Retval: 22,
+ SwIfIndex: 4,
+ })
+ request := &memif.MemifCreate{
+ Role: 10,
+ ID: 12,
+ RingSize: 8000,
+ BufferSize: 50,
+ }
+ reply := &memif.MemifCreateReply{}
+
+ err := ctx.ch.SendRequest(request).ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.Retval).To(BeEquivalentTo(22), "Incorrect Retval value for MemifCreate")
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(4), "Incorrect SwIfIndex value for MemifCreate")
+}
+
+func TestRequestReplyMemifDelete(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.mockVpp.MockReply(&memif.MemifDeleteReply{
+ Retval: 24,
+ })
+ request := &memif.MemifDelete{
+ SwIfIndex: 15,
+ }
+ reply := &memif.MemifDeleteReply{}
+
+ err := ctx.ch.SendRequest(request).ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.Retval).To(BeEquivalentTo(24), "Incorrect Retval value for MemifDelete")
+}
+
+func TestRequestReplyMemifDetails(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.mockVpp.MockReply(&memif.MemifDetails{
+ SwIfIndex: 25,
+ IfName: []byte("memif-name"),
+ Role: 0,
+ })
+ request := &memif.MemifDump{}
+ reply := &memif.MemifDetails{}
+
+ err := ctx.ch.SendRequest(request).ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for MemifDetails")
+ Expect(reply.IfName).ToNot(BeEmpty(), "MemifDetails IfName is empty byte array")
+ Expect(reply.Role).To(BeEquivalentTo(0), "Incorrect Role value for MemifDetails")
+}
+
+func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ // mock reply
+ var msgs []api.Message
+ for i := 1; i <= 10; i++ {
+ msgs = append(msgs, &tap.SwInterfaceTapDetails{
+ SwIfIndex: uint32(i),
+ DevName: []byte("dev-name-test"),
+ })
+ }
+ ctx.mockVpp.MockReply(msgs...)
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+
+ reqCtx := ctx.ch.SendMultiRequest(&tap.SwInterfaceTapDump{})
+ cnt := 0
+ for {
+ msg := &tap.SwInterfaceTapDetails{}
+ stop, err := reqCtx.ReceiveReply(msg)
+ if stop {
+ break // break out of the loop
+ }
+ Expect(err).ShouldNot(HaveOccurred())
+ cnt++
+ }
+ Expect(cnt).To(BeEquivalentTo(10))
+}
+
+func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ // mock reply
+ var msgs []api.Message
+ for i := 1; i <= 10; i++ {
+ msgs = append(msgs, &memif.MemifDetails{
+ SwIfIndex: uint32(i),
+ })
+ }
+ ctx.mockVpp.MockReply(msgs...)
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+
+ reqCtx := ctx.ch.SendMultiRequest(&memif.MemifDump{})
+ cnt := 0
+ for {
+ msg := &memif.MemifDetails{}
+ stop, err := reqCtx.ReceiveReply(msg)
+ if stop {
+ break // break out of the loop
+ }
+ Expect(err).ShouldNot(HaveOccurred())
+ cnt++
+ }
+ Expect(cnt).To(BeEquivalentTo(10))
+}
+
+func TestNotifications(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ // subscribe for notification
+ notifChan := make(chan api.Message, 1)
+ subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ // mock the notification and force its delivery
+ ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
+ SwIfIndex: 3,
+ AdminUpDown: 1,
+ })
+ ctx.mockVpp.SendMsg(0, []byte(""))
+
+ // receive the notification
+ var notif *interfaces.SwInterfaceSetFlags
+ Eventually(func() *interfaces.SwInterfaceSetFlags {
+ select {
+ case n := <-notifChan:
+ notif = n.(*interfaces.SwInterfaceSetFlags)
+ return notif
+ default:
+ return nil
+ }
+ }).ShouldNot(BeNil())
+
+ // verify the received notifications
+ Expect(notif.SwIfIndex).To(BeEquivalentTo(3), "Incorrect SwIfIndex value for SwInterfaceSetFlags")
+ Expect(notif.AdminUpDown).To(BeEquivalentTo(1), "Incorrect AdminUpDown value for SwInterfaceSetFlags")
+
+ ctx.ch.UnsubscribeNotification(subs)
+}
+
+func TestNotificationEvent(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ // subscribe for notification
+ notifChan := make(chan api.Message, 1)
+ subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ // mock the notification and force its delivery
+ ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{
+ SwIfIndex: 2,
+ LinkUpDown: 1,
+ })
+ ctx.mockVpp.SendMsg(0, []byte(""))
+
+ // receive the notification
+ var notif *interfaces.SwInterfaceEvent
+ Eventually(func() *interfaces.SwInterfaceEvent {
+ select {
+ case n := <-notifChan:
+ notif = n.(*interfaces.SwInterfaceEvent)
+ return notif
+ default:
+ return nil
+ }
+ }).ShouldNot(BeNil())
+
+ // verify the received notifications
+ Expect(notif.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for SwInterfaceSetFlags")
+ Expect(notif.LinkUpDown).To(BeEquivalentTo(1), "Incorrect LinkUpDown value for SwInterfaceSetFlags")
+
+ ctx.ch.UnsubscribeNotification(subs)
+}
+
+func TestCheckMessageCompatibility(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ err := ctx.ch.CheckMessageCompatibility(&interfaces.SwInterfaceSetFlags{})
+ Expect(err).ShouldNot(HaveOccurred())
+}
+func TestSetReplyTimeout(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.ch.SetReplyTimeout(time.Millisecond)
+
+ // first one request should work
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+ err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
+ Expect(err).ShouldNot(HaveOccurred())
+
+ // no other reply ready - expect timeout
+ err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
+ Expect(err).Should(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("timeout"))
+}
+
+func TestSetReplyTimeoutMultiRequest(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.ch.SetReplyTimeout(time.Millisecond)
+
+ var msgs []api.Message
+ for i := 1; i <= 3; i++ {
+ msgs = append(msgs, &interfaces.SwInterfaceDetails{
+ SwIfIndex: uint32(i),
+ InterfaceName: []byte("if-name-test"),
+ })
+ }
+ ctx.mockVpp.MockReply(msgs...)
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
+
+ cnt := 0
+ sendMultiRequest := func() error {
+ reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{})
+ for {
+ msg := &interfaces.SwInterfaceDetails{}
+ stop, err := reqCtx.ReceiveReply(msg)
+ if stop {
+ break // break out of the loop
+ }
+ if err != nil {
+ return err
+ }
+ cnt++
+ }
+ return nil
+ }
+
+ // first one request should work
+ err := sendMultiRequest()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ // no other reply ready - expect timeout
+ err = sendMultiRequest()
+ Expect(err).Should(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("timeout"))
+
+ Expect(cnt).To(BeEquivalentTo(3))
+}
+
+func TestReceiveReplyNegative(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ // invalid context 1
+ reqCtx1 := &requestCtxData{}
+ err := reqCtx1.ReceiveReply(&vpe.ControlPingReply{})
+ Expect(err).Should(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("invalid request context"))
+
+ // invalid context 2
+ reqCtx2 := &multiRequestCtxData{}
+ _, err = reqCtx2.ReceiveReply(&vpe.ControlPingReply{})
+ Expect(err).Should(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("invalid request context"))
+
+ // NU
+ reqCtx3 := &requestCtxData{}
+ err = reqCtx3.ReceiveReply(nil)
+ Expect(err).Should(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("invalid request context"))
+}
+
+func TestMultiRequestDouble(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ // mock reply
+ var msgs []mock.MsgWithContext
+ for i := 1; i <= 3; i++ {
+ msgs = append(msgs, mock.MsgWithContext{
+ Msg: &interfaces.SwInterfaceDetails{
+ SwIfIndex: uint32(i),
+ InterfaceName: []byte("if-name-test"),
+ },
+ Multipart: true,
+ SeqNum: 1,
+ })
+ }
+ msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 1})
+
+ for i := 1; i <= 3; i++ {
+ msgs = append(msgs,
+ mock.MsgWithContext{
+ Msg: &interfaces.SwInterfaceDetails{
+ SwIfIndex: uint32(i),
+ InterfaceName: []byte("if-name-test"),
+ },
+ Multipart: true,
+ SeqNum: 2,
+ })
+ }
+ msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 2})
+
+ ctx.mockVpp.MockReplyWithContext(msgs...)
+
+ cnt := 0
+ var sendMultiRequest = func() error {
+ reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{})
+ for {
+ msg := &interfaces.SwInterfaceDetails{}
+ stop, err := reqCtx.ReceiveReply(msg)
+ if stop {
+ break // break out of the loop
+ }
+ if err != nil {
+ return err
+ }
+ cnt++
+ }
+ return nil
+ }
+
+ err := sendMultiRequest()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = sendMultiRequest()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(cnt).To(BeEquivalentTo(6))
+}
+
+func TestReceiveReplyAfterTimeout(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.ch.SetReplyTimeout(time.Millisecond)
+
+ // first one request should work
+ ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 1})
+ err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
+ Expect(err).Should(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("timeout"))
+
+ ctx.mockVpp.MockReplyWithContext(
+ // simulating late reply
+ mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 2},
+ // normal reply for next request
+ mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3})
+
+ req := &tap.TapConnect{
+ TapName: []byte("test-tap-name"),
+ UseRandomMac: 1,
+ }
+ reply := &tap.TapConnectReply{}
+
+ // should succeed
+ err = ctx.ch.SendRequest(req).ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+}
+
+func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
+ /*
+ TODO: fix mock adapter
+ This test will fail because mock adapter will stop sending replies
+ when it encounters control_ping_reply from multi request,
+ thus never sending reply for next request
+ */
+ t.Skip()
+
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.ch.SetReplyTimeout(time.Millisecond * 100)
+
+ // first one request should work
+ ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, SeqNum: 1})
+ err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
+ Expect(err).ShouldNot(HaveOccurred())
+
+ cnt := 0
+ var sendMultiRequest = func() error {
+ reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{})
+ for {
+ msg := &interfaces.SwInterfaceDetails{}
+ stop, err := reqCtx.ReceiveReply(msg)
+ if stop {
+ break // break out of the loop
+ }
+ if err != nil {
+ return err
+ }
+ cnt++
+ }
+ return nil
+ }
+
+ err = sendMultiRequest()
+ Expect(err).Should(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("timeout"))
+ Expect(cnt).To(BeEquivalentTo(0))
+
+ // simulating late replies
+ var msgs []mock.MsgWithContext
+ for i := 1; i <= 3; i++ {
+ msgs = append(msgs, mock.MsgWithContext{
+ Msg: &interfaces.SwInterfaceDetails{
+ SwIfIndex: uint32(i),
+ InterfaceName: []byte("if-name-test"),
+ },
+ Multipart: true,
+ SeqNum: 2,
+ })
+ }
+ msgs = append(msgs, mock.MsgWithContext{Msg: &vpe.ControlPingReply{}, Multipart: true, SeqNum: 2})
+ ctx.mockVpp.MockReplyWithContext(msgs...)
+
+ // normal reply for next request
+ ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3})
+
+ req := &tap.TapConnect{
+ TapName: []byte("test-tap-name"),
+ UseRandomMac: 1,
+ }
+ reply := &tap.TapConnectReply{}
+
+ // should succeed
+ err = ctx.ch.SendRequest(req).ReceiveReply(reply)
+ Expect(err).ShouldNot(HaveOccurred())
+}
+
+func TestInvalidMessageID(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ // first one request should work
+ ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+ err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{})
+ Expect(err).ShouldNot(HaveOccurred())
+
+ // second should fail with error invalid message ID
+ ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+ err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{})
+ Expect(err).Should(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("invalid message ID"))
+}
diff --git a/core/core.go b/core/connection.go
index 052eb0b..a44d0c4 100644
--- a/core/core.go
+++ b/core/connection.go
@@ -27,6 +27,7 @@ import (
"git.fd.io/govpp.git/adapter"
"git.fd.io/govpp.git/api"
+ "git.fd.io/govpp.git/codec"
"git.fd.io/govpp.git/core/bin_api/vpe"
)
@@ -71,13 +72,13 @@ type ConnectionEvent struct {
type Connection struct {
vpp adapter.VppAdapter // VPP adapter
connected uint32 // non-zero if the adapter is connected to VPP
- codec *MsgCodec // message codec
+ codec *codec.MsgCodec // message codec
msgIDsLock sync.RWMutex // lock for the message IDs map
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
- channelsLock sync.RWMutex // lock for the channels map
- channels map[uint16]*api.Channel // map of all API channels indexed by the channel ID
+ channelsLock sync.RWMutex // lock for the channels map
+ channels map[uint16]*channel // map of all API channels indexed by the channel ID
notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
@@ -197,8 +198,8 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
conn = &Connection{
vpp: vppAdapter,
- codec: &MsgCodec{},
- channels: make(map[uint16]*api.Channel),
+ codec: &codec.MsgCodec{},
+ channels: make(map[uint16]*channel),
msgIDs: make(map[string]uint16),
notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
}
@@ -268,7 +269,7 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
// it continues with connectLoop and tries to reconnect.
func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
// create a separate API channel for health check probes
- ch, err := conn.NewAPIChannelBuffered(1, 1)
+ ch, err := conn.newAPIChannelBuffered(1, 1)
if err != nil {
log.Error("Failed to create health check API channel, health check will be disabled:", err)
return
@@ -290,18 +291,18 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
// try draining probe replies from previous request before sending next one
select {
- case <-ch.ReplyChan:
+ case <-ch.replyChan:
log.Debug("drained old probe reply from reply channel")
default:
}
// send the control ping request
- ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
+ ch.reqChan <- &api.VppRequest{Message: msgControlPing}
for {
// expect response within timeout period
select {
- case vppReply := <-ch.ReplyChan:
+ case vppReply := <-ch.replyChan:
err = vppReply.Error
case <-time.After(healthCheckReplyTimeout):
@@ -349,32 +350,34 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
c.connectLoop(connChan)
}
-// NewAPIChannel returns a new API channel for communication with VPP via govpp core.
-// It uses default buffer sizes for the request and reply Go channels.
-func (c *Connection) NewAPIChannel() (*api.Channel, error) {
- if c == nil {
- return nil, errors.New("nil connection passed in")
- }
- return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
+func (c *Connection) NewAPIChannel() (api.Channel, error) {
+ return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
+}
+
+func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
+ return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize)
}
// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
// It allows to specify custom buffer sizes for the request and reply Go channels.
-func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
+func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) {
if c == nil {
return nil, errors.New("nil connection passed in")
}
chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
- ch := api.NewChannelInternal(chID)
- ch.MsgDecoder = c.codec
- ch.MsgIdentifier = c
+ ch := &channel{
+ id: chID,
+ replyTimeout: defaultReplyTimeout,
+ }
+ ch.msgDecoder = c.codec
+ ch.msgIdentifier = c
// create the communication channels
- ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
- ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
- ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
- ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
+ ch.reqChan = make(chan *api.VppRequest, reqChanBufSize)
+ ch.replyChan = make(chan *api.VppReply, replyChanBufSize)
+ ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
+ ch.notifSubsReplyChan = make(chan error, replyChanBufSize)
// store API channel within the client
c.channelsLock.Lock()
@@ -388,13 +391,13 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
}
// releaseAPIChannel releases API channel that needs to be closed.
-func (c *Connection) releaseAPIChannel(ch *api.Channel) {
+func (c *Connection) releaseAPIChannel(ch *channel) {
log.WithFields(logger.Fields{
- "ID": ch.ID,
+ "ID": ch.id,
}).Debug("API channel closed.")
// delete the channel from channels map
c.channelsLock.Lock()
- delete(c.channels, ch.ID)
+ delete(c.channels, ch.id)
c.channelsLock.Unlock()
}
diff --git a/core/core_test.go b/core/connection_test.go
index e4fbf63..b7c3aa0 100644
--- a/core/core_test.go
+++ b/core/connection_test.go
@@ -24,13 +24,14 @@ import (
"git.fd.io/govpp.git/examples/bin_api/interfaces"
"git.fd.io/govpp.git/examples/bin_api/stats"
+ "git.fd.io/govpp.git/codec"
. "github.com/onsi/gomega"
)
type testCtx struct {
mockVpp *mock.VppAdapter
conn *core.Connection
- ch *api.Channel
+ ch api.Channel
}
func setupTest(t *testing.T, bufferedChan bool) *testCtx {
@@ -68,14 +69,14 @@ func TestSimpleRequest(t *testing.T) {
reply := &vpe.ControlPingReply{}
// send the request and receive a reply
- ctx.ch.ReqChan <- &api.VppRequest{Message: req}
- vppReply := <-ctx.ch.ReplyChan
+ ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: req}
+ vppReply := <-ctx.ch.GetReplyChannel()
Expect(vppReply).ShouldNot(BeNil())
Expect(vppReply.Error).ShouldNot(HaveOccurred())
// decode the message
- err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply)
+ err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply)
Expect(err).ShouldNot(HaveOccurred())
Expect(reply.Retval).To(BeEquivalentTo(-5))
@@ -93,12 +94,12 @@ func TestMultiRequest(t *testing.T) {
ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
// send multipart request
- ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true}
+ ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true}
cnt := 0
for {
// receive a reply
- vppReply := <-ctx.ch.ReplyChan
+ vppReply := <-ctx.ch.GetReplyChannel()
if vppReply.LastReplyReceived {
break // break out of the loop
}
@@ -106,7 +107,7 @@ func TestMultiRequest(t *testing.T) {
// decode the message
reply := &interfaces.SwInterfaceDetails{}
- err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply)
+ err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply)
Expect(err).ShouldNot(HaveOccurred())
cnt++
}
@@ -124,11 +125,11 @@ func TestNotifications(t *testing.T) {
NotifChan: notifChan,
MsgFactory: interfaces.NewSwInterfaceSetFlags,
}
- ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{
+ ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{
Subscription: subscription,
Subscribe: true,
}
- err := <-ctx.ch.NotifSubsReplyChan
+ err := <-ctx.ch.GetNotificationReplyChannel()
Expect(err).ShouldNot(HaveOccurred())
// mock the notification and force its delivery
@@ -144,11 +145,11 @@ func TestNotifications(t *testing.T) {
Expect(notif.SwIfIndex).To(BeEquivalentTo(3))
// unsubscribe notification
- ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{
+ ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{
Subscription: subscription,
Subscribe: false,
}
- err = <-ctx.ch.NotifSubsReplyChan
+ err = <-ctx.ch.GetNotificationReplyChannel()
Expect(err).ShouldNot(HaveOccurred())
}
@@ -207,15 +208,15 @@ func TestFullBuffer(t *testing.T) {
// send multiple requests, only one reply should be read
for i := 0; i < 20; i++ {
ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
- ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
+ ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &vpe.ControlPing{}}
}
- vppReply := <-ctx.ch.ReplyChan
+ vppReply := <-ctx.ch.GetReplyChannel()
Expect(vppReply).ShouldNot(BeNil())
var received bool
select {
- case <-ctx.ch.ReplyChan:
+ case <-ctx.ch.GetReplyChannel():
received = true // this should not happen
default:
received = false // no reply to be received
@@ -226,35 +227,35 @@ func TestFullBuffer(t *testing.T) {
func TestCodec(t *testing.T) {
RegisterTestingT(t)
- codec := &core.MsgCodec{}
+ msgCodec := &codec.MsgCodec{}
// request
- data, err := codec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11)
+ data, err := msgCodec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11)
Expect(err).ShouldNot(HaveOccurred())
Expect(data).ShouldNot(BeEmpty())
msg1 := &interfaces.CreateLoopback{}
- err = codec.DecodeMsg(data, msg1)
+ err = msgCodec.DecodeMsg(data, msg1)
Expect(err).ShouldNot(HaveOccurred())
Expect(msg1.MacAddress).To(BeEquivalentTo([]byte{1, 2, 3, 4, 5, 6}))
// reply
- data, err = codec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22)
+ data, err = msgCodec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22)
Expect(err).ShouldNot(HaveOccurred())
Expect(data).ShouldNot(BeEmpty())
msg2 := &vpe.ControlPingReply{}
- err = codec.DecodeMsg(data, msg2)
+ err = msgCodec.DecodeMsg(data, msg2)
Expect(err).ShouldNot(HaveOccurred())
Expect(msg2.Retval).To(BeEquivalentTo(55))
// other
- data, err = codec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33)
+ data, err = msgCodec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33)
Expect(err).ShouldNot(HaveOccurred())
Expect(data).ShouldNot(BeEmpty())
msg3 := &stats.VnetIP4FibCounters{}
- err = codec.DecodeMsg(data, msg3)
+ err = msgCodec.DecodeMsg(data, msg3)
Expect(err).ShouldNot(HaveOccurred())
Expect(msg3.VrfID).To(BeEquivalentTo(77))
}
@@ -262,21 +263,21 @@ func TestCodec(t *testing.T) {
func TestCodecNegative(t *testing.T) {
RegisterTestingT(t)
- codec := &core.MsgCodec{}
+ msgCodec := &codec.MsgCodec{}
// nil message for encoding
- data, err := codec.EncodeMsg(nil, 15)
+ data, err := msgCodec.EncodeMsg(nil, 15)
Expect(err).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("nil message"))
Expect(data).Should(BeNil())
// nil message for decoding
- err = codec.DecodeMsg(data, nil)
+ err = msgCodec.DecodeMsg(data, nil)
Expect(err).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("nil message"))
// nil data for decoding
- err = codec.DecodeMsg(nil, &vpe.ControlPingReply{})
+ err = msgCodec.DecodeMsg(nil, &vpe.ControlPingReply{})
Expect(err).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("EOF"))
}
@@ -285,7 +286,7 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) {
ctx := setupTest(t, false)
defer ctx.teardownTest()
- var reqCtx []*api.RequestCtx
+ var reqCtx []api.RequestCtx
for i := 0; i < 10; i++ {
ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)})
req := &vpe.ControlPing{}
@@ -524,7 +525,7 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
defer ctx.teardownTest()
numIters := 0xffff + 100
- reqCtx := make(map[int]*api.RequestCtx)
+ reqCtx := make(map[int]api.RequestCtx)
for i := 0; i < numIters+30; /* receiver is 30 reqs behind */ i++ {
if i < numIters {
diff --git a/core/doc.go b/core/doc.go
index a4ecd50..5b0b40e 100644
--- a/core/doc.go
+++ b/core/doc.go
@@ -17,4 +17,95 @@
// defer ch.Close()
//
// Note that one application can open only one connection, that can serve multiple API channels.
+//
+// The API offers two ways of communication with govpp core: using Go channels, or using convenient function
+// wrappers over the Go channels. The latter should be sufficient for most of the use cases.
+//
+// The entry point to the API is the Channel structure, that can be obtained from the existing connection using
+// the NewAPIChannel or NewAPIChannelBuffered functions:
+//
+// conn, err := govpp.Connect()
+// if err != nil {
+// // handle error!
+// }
+// defer conn.Disconnect()
+//
+// ch, err := conn.NewAPIChannel()
+// if err != nil {
+// // handle error!
+// }
+// defer ch.Close()
+//
+//
+// Simple Request-Reply API
+//
+// The simple version of the API is based on blocking SendRequest / ReceiveReply calls, where a single request
+// message is sent to VPP and a single reply message is filled in when the reply comes from VPP:
+//
+// req := &acl.ACLPluginGetVersion{}
+// reply := &acl.ACLPluginGetVersionReply{}
+//
+// err := ch.SendRequest(req).ReceiveReply(reply)
+// // process the reply
+//
+// Note that if the reply message type that comes from VPP does not match with provided one, you'll get an error.
+//
+//
+// Multipart Reply API
+//
+// If multiple messages are expected as a reply to a request, SendMultiRequest API must be used:
+//
+// req := &interfaces.SwInterfaceDump{}
+// reqCtx := ch.SendMultiRequest(req)
+//
+// for {
+// reply := &interfaces.SwInterfaceDetails{}
+// stop, err := reqCtx.ReceiveReply(reply)
+// if stop {
+// break // break out of the loop
+// }
+// // process the reply
+// }
+//
+// Note that if the last reply has been already consumed, stop boolean return value is set to true.
+// Do not use the message itself if stop is true - it won't be filled with actual data.
+//
+//
+// Go Channels API
+//
+// The blocking API introduced above may be not sufficient for some management applications that strongly
+// rely on usage of Go channels. In this case, the API allows to access the underlying Go channels directly, e.g.
+// the following replacement of the SendRequest / ReceiveReply API:
+//
+// req := &acl.ACLPluginGetVersion{}
+// // send the request to the request go channel
+// ch.GetRequestChannel <- &api.VppRequest{Message: req}
+//
+// // receive a reply from the reply go channel
+// vppReply := <-ch.GetReplyChannel
+//
+// // decode the message
+// reply := &acl.ACLPluginGetVersionReply{}
+// err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply)
+//
+// // process the reply
+//
+//
+// Notifications API
+//
+// to subscribe for receiving of the specified notification messages via provided Go channel, use the
+// SubscribeNotification API:
+//
+// // subscribe for specific notification message
+// notifChan := make(chan api.Message, 100)
+// subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags)
+//
+// // receive one notification
+// notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags)
+//
+// ch.UnsubscribeNotification(subs)
+//
+// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
+// buffer is full, the notifications will not be delivered into it.
+//
package core
diff --git a/core/msg_codec.go b/core/msg_codec.go
deleted file mode 100644
index e32916b..0000000
--- a/core/msg_codec.go
+++ /dev/null
@@ -1,159 +0,0 @@
-// Copyright (c) 2017 Cisco and/or its affiliates.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at:
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package core
-
-import (
- "bytes"
- "errors"
- "fmt"
- "reflect"
-
- "github.com/lunixbochs/struc"
- logger "github.com/sirupsen/logrus"
-
- "git.fd.io/govpp.git/api"
-)
-
-// MsgCodec provides encoding and decoding functionality of `api.Message` structs into/from
-// binary format as accepted by VPP.
-type MsgCodec struct{}
-
-// VppRequestHeader struct contains header fields implemented by all VPP requests.
-type VppRequestHeader struct {
- VlMsgID uint16
- ClientIndex uint32
- Context uint32
-}
-
-// VppReplyHeader struct contains header fields implemented by all VPP replies.
-type VppReplyHeader struct {
- VlMsgID uint16
- Context uint32
-}
-
-// VppEventHeader struct contains header fields implemented by all VPP events.
-type VppEventHeader struct {
- VlMsgID uint16
- Context uint32
-}
-
-// VppOtherHeader struct contains header fields implemented by other VPP messages (not requests nor replies).
-type VppOtherHeader struct {
- VlMsgID uint16
-}
-
-const (
- vppRequestHeaderSize = 10 // size of a VPP request header
- vppReplyHeaderSize = 6 // size of a VPP reply header
- vppEventHeaderSize = 6 // size of a VPP event header
- vppOtherHeaderSize = 2 // size of the header of other VPP messages
-)
-
-// EncodeMsg encodes provided `Message` structure into its binary-encoded data representation.
-func (*MsgCodec) EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) {
- if msg == nil {
- return nil, errors.New("nil message passed in")
- }
-
- buf := new(bytes.Buffer)
-
- // encode message header
- var header interface{}
- if msg.GetMessageType() == api.RequestMessage {
- header = &VppRequestHeader{VlMsgID: msgID}
- } else if msg.GetMessageType() == api.ReplyMessage {
- header = &VppReplyHeader{VlMsgID: msgID}
- } else if msg.GetMessageType() == api.EventMessage {
- header = &VppEventHeader{VlMsgID: msgID}
- } else {
- header = &VppOtherHeader{VlMsgID: msgID}
- }
- err := struc.Pack(buf, header)
- if err != nil {
- log.WithFields(logger.Fields{
- "error": err,
- "header": header,
- }).Error("Unable to encode the message header: ", err)
- return nil, fmt.Errorf("unable to encode the message header: %v", err)
- }
-
- // encode message content
- if reflect.Indirect(reflect.ValueOf(msg)).NumField() > 0 {
- err := struc.Pack(buf, msg)
- if err != nil {
- log.WithFields(logger.Fields{
- "error": err,
- "message": msg,
- }).Error("Unable to encode the message: ", err)
- return nil, fmt.Errorf("unable to encode the message: %v", err)
- }
- }
-
- return buf.Bytes(), nil
-}
-
-// DecodeMsg decodes binary-encoded data of a message into provided `Message` structure.
-func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error {
- if msg == nil {
- return errors.New("nil message passed in")
- }
-
- buf := bytes.NewReader(data)
-
- // check which header is expected
- var header interface{}
- if msg.GetMessageType() == api.RequestMessage {
- header = &VppRequestHeader{}
- } else if msg.GetMessageType() == api.ReplyMessage {
- header = &VppReplyHeader{}
- } else if msg.GetMessageType() == api.EventMessage {
- header = &VppEventHeader{}
- } else {
- header = &VppOtherHeader{}
- }
-
- // decode message header
- err := struc.Unpack(buf, header)
- if err != nil {
- log.WithFields(logger.Fields{
- "error": err,
- "data": data,
- }).Error("Unable to decode header of the message.")
- return fmt.Errorf("unable to decode the message header: %v", err)
- }
-
- // get rid of the message header
- if msg.GetMessageType() == api.RequestMessage {
- buf = bytes.NewReader(data[vppRequestHeaderSize:])
- } else if msg.GetMessageType() == api.ReplyMessage {
- buf = bytes.NewReader(data[vppReplyHeaderSize:])
- } else if msg.GetMessageType() == api.EventMessage {
- buf = bytes.NewReader(data[vppEventHeaderSize:])
- } else {
- buf = bytes.NewReader(data[vppOtherHeaderSize:])
- }
-
- // decode message content
- err = struc.Unpack(buf, msg)
- if err != nil {
- log.WithFields(logger.Fields{
- "error": err,
- "data": buf,
- }).Error("Unable to decode the message.")
- return fmt.Errorf("unable to decode the message: %v", err)
- }
-
- return nil
-}
diff --git a/core/notification_handler.go b/core/notification_handler.go
index 89c16a4..c0e8687 100644
--- a/core/notification_handler.go
+++ b/core/notification_handler.go
@@ -18,13 +18,12 @@ import (
"fmt"
"reflect"
- logger "github.com/sirupsen/logrus"
-
"git.fd.io/govpp.git/api"
+ logger "github.com/sirupsen/logrus"
)
// processNotifSubscribeRequest processes a notification subscribe request.
-func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error {
+func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSubscribeRequest) error {
var err error
// subscribe / unsubscribe
@@ -36,7 +35,7 @@ func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.Noti
// send the reply into the go channel
select {
- case ch.NotifSubsReplyChan <- err:
+ case ch.notifSubsReplyChan <- err:
// reply sent successfully
default:
// unable to write into the channel without blocking
diff --git a/core/request_handler.go b/core/request_handler.go
index 3bec38d..8681963 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -31,10 +31,10 @@ var (
)
// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel) {
+func (c *Connection) watchRequests(ch *channel) {
for {
select {
- case req, ok := <-ch.ReqChan:
+ case req, ok := <-ch.reqChan:
// new request on the request channel
if !ok {
// after closing the request channel, release API channel and return
@@ -43,7 +43,7 @@ func (c *Connection) watchRequests(ch *api.Channel) {
}
c.processRequest(ch, req)
- case req := <-ch.NotifSubsChan:
+ case req := <-ch.notifSubsChan:
// new request on the notification subscribe channel
c.processNotifSubscribeRequest(ch, req)
}
@@ -51,7 +51,7 @@ func (c *Connection) watchRequests(ch *api.Channel) {
}
// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error {
+func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error {
// check whether we are connected to VPP
if atomic.LoadUint32(&c.connected) == 0 {
err := ErrNotConnected
@@ -78,7 +78,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error
if err != nil {
err = fmt.Errorf("unable to encode the messge: %v", err)
log.WithFields(logger.Fields{
- "channel": ch.ID,
+ "channel": ch.id,
"msg_id": msgID,
"seq_num": req.SeqNum,
}).Error(err)
@@ -88,7 +88,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
- "channel": ch.ID,
+ "channel": ch.id,
"msg_id": msgID,
"msg_size": len(data),
"msg_name": req.Message.GetMessageName(),
@@ -97,7 +97,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error
}
// send the request to VPP
- context := packRequestContext(ch.ID, req.Multipart, req.SeqNum)
+ context := packRequestContext(ch.id, req.Multipart, req.SeqNum)
err = c.vpp.SendMsg(context, data)
if err != nil {
err = fmt.Errorf("unable to send the message: %v", err)
@@ -189,9 +189,9 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
// it logs the error and do not send the message.
-func sendReply(ch *api.Channel, reply *api.VppReply) {
+func sendReply(ch *channel, reply *api.VppReply) {
select {
- case ch.ReplyChan <- reply:
+ case ch.replyChan <- reply:
// reply sent successfully
case <-time.After(time.Millisecond * 100):
// receiver still not ready