diff options
author | Ondrej Fabry <ofabry@cisco.com> | 2018-08-23 22:51:56 +0200 |
---|---|---|
committer | Ondrej Fabry <ofabry@cisco.com> | 2018-08-24 12:43:05 +0200 |
commit | 6b350c65fe0ec845cecf58bfb41ffc63dc9c04f7 (patch) | |
tree | 6255495854f43ec2f2d11f88990369aadb48db3f /core | |
parent | 892683bef86cacc2ccda2b4df2b079171bd92164 (diff) |
Simplify subscribing to events and fix events
- there is no need for sending subscription requests through channels,
since all the messages are registered and no communication with VPP
is needed
Change-Id: Ibc29957be02a32e26309f66c369a071559b822a9
Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
Diffstat (limited to 'core')
-rw-r--r-- | core/channel.go | 218 | ||||
-rw-r--r-- | core/channel_test.go | 173 | ||||
-rw-r--r-- | core/connection.go | 235 | ||||
-rw-r--r-- | core/connection_test.go | 212 | ||||
-rw-r--r-- | core/notification_handler.go | 170 | ||||
-rw-r--r-- | core/request_handler.go | 81 |
6 files changed, 431 insertions, 658 deletions
diff --git a/core/channel.go b/core/channel.go index 718f89c..a7d95fe 100644 --- a/core/channel.go +++ b/core/channel.go @@ -29,40 +29,20 @@ var ( ErrInvalidRequestCtx = errors.New("invalid request context") ) -// requestCtx is a context for request with single reply -type requestCtx struct { - ch *channel - seqNum uint16 -} - -// multiRequestCtx is a context for request with multiple responses -type multiRequestCtx struct { - ch *channel - seqNum uint16 -} - -func (req *requestCtx) ReceiveReply(msg api.Message) error { - if req == nil || req.ch == nil { - return ErrInvalidRequestCtx - } - - lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) - if err != nil { - return err - } - if lastReplyReceived { - return errors.New("multipart reply recieved while a single reply expected") - } - - return nil +// MessageCodec provides functionality for decoding binary data to generated API messages. +type MessageCodec interface { + //EncodeMsg encodes message into binary data. + EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) + // DecodeMsg decodes binary-encoded data of a message into provided Message structure. + DecodeMsg(data []byte, msg api.Message) error } -func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { - if req == nil || req.ch == nil { - return false, ErrInvalidRequestCtx - } - - return req.ch.receiveReplyInternal(msg, req.seqNum) +// MessageIdentifier provides identification of generated API messages. +type MessageIdentifier interface { + // GetMessageID returns message identifier of given API message. + GetMessageID(msg api.Message) (uint16, error) + // LookupByID looks up message name and crc by ID + LookupByID(msgID uint16) (api.Message, error) } // vppRequest is a request that will be sent to VPP. @@ -81,27 +61,40 @@ type vppReply struct { err error // in case of error, data is nil and this member contains error } -// NotifSubscribeRequest is a request to subscribe for delivery of specific notification messages. -type subscriptionRequest struct { - sub *api.NotifSubscription // subscription details - subscribe bool // true if this is a request to subscribe +// requestCtx is a context for request with single reply +type requestCtx struct { + ch *Channel + seqNum uint16 +} + +// multiRequestCtx is a context for request with multiple responses +type multiRequestCtx struct { + ch *Channel + seqNum uint16 +} + +// subscriptionCtx is a context of subscription for delivery of specific notification messages. +type subscriptionCtx struct { + ch *Channel + notifChan chan api.Message // channel where notification messages will be delivered to + msgID uint16 // message ID for the subscribed event message + event api.Message // event message that this subscription is for + msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification } // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines // concurrently, otherwise the responses could mix! Use multiple channels instead. -type channel struct { - id uint16 // channel ID +type Channel struct { + id uint16 + conn *Connection reqChan chan *vppRequest // channel for sending the requests to VPP replyChan chan *vppReply // channel where VPP replies are delivered to - notifSubsChan chan *subscriptionRequest // channel for sending notification subscribe requests - notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to - - msgDecoder api.MessageDecoder // used to decode binary data to generated API messages - msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message + msgCodec MessageCodec // used to decode binary data to generated API messages + msgIdentifier MessageIdentifier // used to retrieve message ID of a message lastSeqNum uint16 // sequence number of the last sent request @@ -109,73 +102,142 @@ type channel struct { replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout } -func (ch *channel) GetID() uint16 { +func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel { + return &Channel{ + id: id, + conn: conn, + msgCodec: codec, + msgIdentifier: identifier, + reqChan: make(chan *vppRequest, reqSize), + replyChan: make(chan *vppReply, replySize), + replyTimeout: DefaultReplyTimeout, + } +} + +func (ch *Channel) GetID() uint16 { return ch.id } -func (ch *channel) nextSeqNum() uint16 { +func (ch *Channel) nextSeqNum() uint16 { ch.lastSeqNum++ return ch.lastSeqNum } -func (ch *channel) SendRequest(msg api.Message) api.RequestCtx { - req := &vppRequest{ +func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx { + seqNum := ch.nextSeqNum() + ch.reqChan <- &vppRequest{ msg: msg, - seqNum: ch.nextSeqNum(), + seqNum: seqNum, } - ch.reqChan <- req - return &requestCtx{ch: ch, seqNum: req.seqNum} + return &requestCtx{ch: ch, seqNum: seqNum} } -func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { - req := &vppRequest{ +func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { + seqNum := ch.nextSeqNum() + ch.reqChan <- &vppRequest{ msg: msg, - seqNum: ch.nextSeqNum(), + seqNum: seqNum, multi: true, } - ch.reqChan <- req - return &multiRequestCtx{ch: ch, seqNum: req.seqNum} + return &multiRequestCtx{ch: ch, seqNum: seqNum} } -func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) { - sub := &api.NotifSubscription{ - NotifChan: notifChan, - MsgFactory: msgFactory, +func getMsgFactory(msg api.Message) func() api.Message { + return func() api.Message { + return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) } - // TODO: get rid of notifSubsChan and notfSubsReplyChan, - // it's no longer need because we know all message IDs and can store subscription right here - ch.notifSubsChan <- &subscriptionRequest{ - sub: sub, - subscribe: true, - } - return sub, <-ch.notifSubsReplyChan } -func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error { - ch.notifSubsChan <- &subscriptionRequest{ - sub: subscription, - subscribe: false, +func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) { + msgID, err := ch.msgIdentifier.GetMessageID(event) + if err != nil { + log.WithFields(logrus.Fields{ + "msg_name": event.GetMessageName(), + "msg_crc": event.GetCrcString(), + }).Errorf("unable to retrieve message ID: %v", err) + return nil, fmt.Errorf("unable to retrieve event message ID: %v", err) + } + + sub := &subscriptionCtx{ + ch: ch, + notifChan: notifChan, + msgID: msgID, + event: event, + msgFactory: getMsgFactory(event), } - return <-ch.notifSubsReplyChan + + // add the subscription into map + ch.conn.subscriptionsLock.Lock() + defer ch.conn.subscriptionsLock.Unlock() + + ch.conn.subscriptions[msgID] = append(ch.conn.subscriptions[msgID], sub) + + return sub, nil } -func (ch *channel) SetReplyTimeout(timeout time.Duration) { +func (ch *Channel) SetReplyTimeout(timeout time.Duration) { ch.replyTimeout = timeout } -func (ch *channel) Close() { +func (ch *Channel) Close() { if ch.reqChan != nil { close(ch.reqChan) + ch.reqChan = nil + } +} + +func (req *requestCtx) ReceiveReply(msg api.Message) error { + if req == nil || req.ch == nil { + return ErrInvalidRequestCtx + } + + lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) + if err != nil { + return err + } else if lastReplyReceived { + return errors.New("multipart reply recieved while a single reply expected") + } + + return nil +} + +func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { + if req == nil || req.ch == nil { + return false, ErrInvalidRequestCtx } + + return req.ch.receiveReplyInternal(msg, req.seqNum) +} + +func (sub *subscriptionCtx) Unsubscribe() error { + log.WithFields(logrus.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": sub.msgID, + }).Debug("Removing notification subscription.") + + // remove the subscription from the map + sub.ch.conn.subscriptionsLock.Lock() + defer sub.ch.conn.subscriptionsLock.Unlock() + + for i, item := range sub.ch.conn.subscriptions[sub.msgID] { + if item == sub { + // remove i-th item in the slice + sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...) + return nil + } + } + + return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName()) } // receiveReplyInternal receives a reply from the reply channel into the provided msg structure. -func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) { - var ignore bool +func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) { if msg == nil { return false, errors.New("nil message passed in") } + var ignore bool + if vppReply := ch.delayedReply; vppReply != nil { // try the delayed reply ch.delayedReply = nil @@ -204,12 +266,12 @@ func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last return } -func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { +func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { // check the sequence number cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum) if cmpSeqNums == -1 { // reply received too late, ignore the message - logrus.WithField("sequence-number", reply.seqNum).Warn( + logrus.WithField("seqNum", reply.seqNum).Warn( "Received reply to an already closed binary API request") ignore = true return @@ -253,7 +315,7 @@ func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Messa } // decode the message - if err = ch.msgDecoder.DecodeMsg(reply.data, msg); err != nil { + if err = ch.msgCodec.DecodeMsg(reply.data, msg); err != nil { return } diff --git a/core/channel_test.go b/core/channel_test.go index 4a9ab2b..0eafa32 100644 --- a/core/channel_test.go +++ b/core/channel_test.go @@ -22,6 +22,7 @@ import ( "git.fd.io/govpp.git/examples/bin_api/interfaces" "git.fd.io/govpp.git/examples/bin_api/memif" "git.fd.io/govpp.git/examples/bin_api/tap" + "git.fd.io/govpp.git/examples/binapi/vpe" "git.fd.io/govpp.git/api" . "github.com/onsi/gomega" @@ -59,10 +60,11 @@ func TestRequestReplyTapConnect(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&tap.TapConnectReply{ - Retval: 0, SwIfIndex: 1, }) + request := &tap.TapConnect{ TapName: []byte("test-tap-name"), UseRandomMac: 1, @@ -71,17 +73,21 @@ func TestRequestReplyTapConnect(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapConnectReply") - Expect(reply.SwIfIndex).To(BeEquivalentTo(1), "Incorrect SwIfIndex value for TapConnectReply") + Expect(reply.Retval).To(BeEquivalentTo(0), + "Incorrect Retval value for TapConnectReply") + Expect(reply.SwIfIndex).To(BeEquivalentTo(1), + "Incorrect SwIfIndex value for TapConnectReply") } func TestRequestReplyTapModify(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&tap.TapModifyReply{ SwIfIndex: 2, }) + request := &tap.TapModify{ TapName: []byte("test-tap-modify"), UseRandomMac: 1, @@ -91,15 +97,19 @@ func TestRequestReplyTapModify(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapModifyReply") - Expect(reply.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for TapModifyReply") + Expect(reply.Retval).To(BeEquivalentTo(0), + "Incorrect Retval value for TapModifyReply") + Expect(reply.SwIfIndex).To(BeEquivalentTo(2), + "Incorrect SwIfIndex value for TapModifyReply") } func TestRequestReplyTapDelete(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&tap.TapDeleteReply{}) + request := &tap.TapDelete{ SwIfIndex: 3, } @@ -107,34 +117,41 @@ func TestRequestReplyTapDelete(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapDeleteReply") + Expect(reply.Retval).To(BeEquivalentTo(0), + "Incorrect Retval value for TapDeleteReply") } func TestRequestReplySwInterfaceTapDump(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply byteName := []byte("dev-name-test") ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{ SwIfIndex: 25, DevName: byteName, }) + request := &tap.SwInterfaceTapDump{} reply := &tap.SwInterfaceTapDetails{} err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for SwInterfaceTapDetails") - Expect(reply.DevName).ToNot(BeNil(), "Incorrect DevName value for SwInterfaceTapDetails") + Expect(reply.SwIfIndex).To(BeEquivalentTo(25), + "Incorrect SwIfIndex value for SwInterfaceTapDetails") + Expect(reply.DevName).ToNot(BeEquivalentTo(byteName), + "Incorrect DevName value for SwInterfaceTapDetails") } func TestRequestReplyMemifCreate(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&memif.MemifCreateReply{ SwIfIndex: 4, }) + request := &memif.MemifCreate{ Role: 10, ID: 12, @@ -145,15 +162,19 @@ func TestRequestReplyMemifCreate(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect Retval value for MemifCreate") - Expect(reply.SwIfIndex).To(BeEquivalentTo(4), "Incorrect SwIfIndex value for MemifCreate") + Expect(reply.Retval).To(BeEquivalentTo(0), + "Incorrect Retval value for MemifCreate") + Expect(reply.SwIfIndex).To(BeEquivalentTo(4), + "Incorrect SwIfIndex value for MemifCreate") } func TestRequestReplyMemifDelete(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&memif.MemifDeleteReply{}) + request := &memif.MemifDelete{ SwIfIndex: 15, } @@ -161,26 +182,30 @@ func TestRequestReplyMemifDelete(t *testing.T) { err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect Retval value for MemifDelete") } func TestRequestReplyMemifDetails(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() + // mock reply ctx.mockVpp.MockReply(&memif.MemifDetails{ SwIfIndex: 25, IfName: []byte("memif-name"), Role: 0, }) + request := &memif.MemifDump{} reply := &memif.MemifDetails{} err := ctx.ch.SendRequest(request).ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for MemifDetails") - Expect(reply.IfName).ToNot(BeEmpty(), "MemifDetails IfName is empty byte array") - Expect(reply.Role).To(BeEquivalentTo(0), "Incorrect Role value for MemifDetails") + Expect(reply.SwIfIndex).To(BeEquivalentTo(25), + "Incorrect SwIfIndex value for MemifDetails") + Expect(reply.IfName).ToNot(BeEmpty(), + "MemifDetails IfName is empty byte array") + Expect(reply.Role).To(BeEquivalentTo(0), + "Incorrect Role value for MemifDetails") } func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { @@ -204,7 +229,7 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { msg := &tap.SwInterfaceTapDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } Expect(err).ShouldNot(HaveOccurred()) cnt++ @@ -232,7 +257,7 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { msg := &memif.MemifDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } Expect(err).ShouldNot(HaveOccurred()) cnt++ @@ -240,51 +265,16 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { Expect(cnt).To(BeEquivalentTo(10)) } -func TestNotifications(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - // subscribe for notification - notifChan := make(chan api.Message, 1) - subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) - Expect(err).ShouldNot(HaveOccurred()) - - // mock the notification and force its delivery - ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ - SwIfIndex: 3, - AdminUpDown: 1, - }) - ctx.mockVpp.SendMsg(0, []byte("")) - - // receive the notification - var notif *interfaces.SwInterfaceSetFlags - Eventually(func() *interfaces.SwInterfaceSetFlags { - select { - case n := <-notifChan: - notif = n.(*interfaces.SwInterfaceSetFlags) - return notif - default: - return nil - } - }).ShouldNot(BeNil()) - - // verify the received notifications - Expect(notif.SwIfIndex).To(BeEquivalentTo(3), "Incorrect SwIfIndex value for SwInterfaceSetFlags") - Expect(notif.AdminUpDown).To(BeEquivalentTo(1), "Incorrect AdminUpDown value for SwInterfaceSetFlags") - - ctx.ch.UnsubscribeNotification(subs) -} - func TestNotificationEvent(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() // subscribe for notification notifChan := make(chan api.Message, 1) - subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent) + sub, err := ctx.ch.SubscribeNotification(notifChan, &interfaces.SwInterfaceEvent{}) Expect(err).ShouldNot(HaveOccurred()) - // mock the notification and force its delivery + // mock event and force its delivery ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{ SwIfIndex: 2, LinkUpDown: 1, @@ -307,16 +297,9 @@ func TestNotificationEvent(t *testing.T) { Expect(notif.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for SwInterfaceSetFlags") Expect(notif.LinkUpDown).To(BeEquivalentTo(1), "Incorrect LinkUpDown value for SwInterfaceSetFlags") - ctx.ch.UnsubscribeNotification(subs) -} - -/*func TestCheckMessageCompatibility(t *testing.T) { - ctx := setupTest(t) - defer ctx.teardownTest() - - err := ctx.ch.CheckMessageCompatibility(&interfaces.SwInterfaceSetFlags{}) + err = sub.Unsubscribe() Expect(err).ShouldNot(HaveOccurred()) -}*/ +} func TestSetReplyTimeout(t *testing.T) { ctx := setupTest(t) @@ -324,8 +307,10 @@ func TestSetReplyTimeout(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond) - // first one request should work + // mock reply ctx.mockVpp.MockReply(&ControlPingReply{}) + + // first one request should work err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -339,16 +324,23 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() - ctx.ch.SetReplyTimeout(time.Millisecond) + ctx.ch.SetReplyTimeout(time.Millisecond * 100) - var msgs []api.Message - for i := 1; i <= 3; i++ { - msgs = append(msgs, &interfaces.SwInterfaceDetails{ - SwIfIndex: uint32(i), + // mock reply + ctx.mockVpp.MockReply( + &interfaces.SwInterfaceDetails{ + SwIfIndex: 1, InterfaceName: []byte("if-name-test"), - }) - } - ctx.mockVpp.MockReply(msgs...) + }, + &interfaces.SwInterfaceDetails{ + SwIfIndex: 2, + InterfaceName: []byte("if-name-test"), + }, + &interfaces.SwInterfaceDetails{ + SwIfIndex: 3, + InterfaceName: []byte("if-name-test"), + }, + ) ctx.mockVpp.MockReply(&ControlPingReply{}) cnt := 0 @@ -357,12 +349,12 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) { for { msg := &interfaces.SwInterfaceDetails{} stop, err := reqCtx.ReceiveReply(msg) - if stop { - break // break out of the loop - } if err != nil { return err } + if stop { + break + } cnt++ } return nil @@ -443,7 +435,7 @@ func TestMultiRequestDouble(t *testing.T) { msg := &interfaces.SwInterfaceDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } if err != nil { return err @@ -468,8 +460,10 @@ func TestReceiveReplyAfterTimeout(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond) - // first one request should work + // mock reply ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 1}) + // first one request should work + err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -479,9 +473,16 @@ func TestReceiveReplyAfterTimeout(t *testing.T) { ctx.mockVpp.MockReplyWithContext( // simulating late reply - mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 2}, + mock.MsgWithContext{ + Msg: &ControlPingReply{}, + SeqNum: 2, + }, // normal reply for next request - mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3}) + mock.MsgWithContext{ + Msg: &tap.TapConnectReply{}, + SeqNum: 3, + }, + ) req := &tap.TapConnect{ TapName: []byte("test-tap-name"), @@ -508,8 +509,10 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond * 100) - // first one request should work + // mock reply ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 1}) + + // first one request should work err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -520,7 +523,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { msg := &interfaces.SwInterfaceDetails{} stop, err := reqCtx.ReceiveReply(msg) if stop { - break // break out of the loop + break } if err != nil { return err @@ -564,18 +567,20 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { Expect(err).ShouldNot(HaveOccurred()) } -/*func TestInvalidMessageID(t *testing.T) { +func TestInvalidMessageID(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() - // first one request should work + // mock reply + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + + // first one request should work err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{}) Expect(err).ShouldNot(HaveOccurred()) // second should fail with error invalid message ID - ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) err = ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{}) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("invalid message ID")) -}*/ +} diff --git a/core/connection.go b/core/connection.go index c77358f..7d014ce 100644 --- a/core/connection.go +++ b/core/connection.go @@ -29,42 +29,19 @@ import ( "git.fd.io/govpp.git/codec" ) -const ( - requestChannelBufSize = 100 // default size of the request channel buffer - replyChannelBufSize = 100 // default size of the reply channel buffer - notificationChannelBufSize = 100 // default size of the notification channel buffer - - defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout +var ( + RequestChanBufSize = 100 // default size of the request channel buffer + ReplyChanBufSize = 100 // default size of the reply channel buffer + NotificationChanBufSize = 100 // default size of the notification channel buffer ) var ( - healthCheckInterval = time.Second * 1 // default health check interval - healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check - healthCheckThreshold = 1 // number of failed health checks until the error is reported + HealthCheckProbeInterval = time.Second * 1 // default health check probe interval + HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe + HealthCheckThreshold = 1 // number of failed health checks until the error is reported + DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP ) -// SetHealthCheckProbeInterval sets health check probe interval. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckProbeInterval(interval time.Duration) { - healthCheckInterval = interval -} - -// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. -// If reply arrives after the timeout, check is considered as failed. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckReplyTimeout(timeout time.Duration) { - healthCheckReplyTimeout = timeout -} - -// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckThreshold(threshold int) { - healthCheckThreshold = threshold -} - // ConnectionState represents the current state of the connection to VPP. type ConnectionState int @@ -104,10 +81,10 @@ type Connection struct { maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) channelsLock sync.RWMutex // lock for the channels map - channels map[uint16]*channel // map of all API channels indexed by the channel ID + channels map[uint16]*Channel // map of all API channels indexed by the channel ID - notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map - notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID + subscriptionsLock sync.RWMutex // lock for the subscriptions map + subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID pingReqID uint16 // ID if the ControlPing message pingReplyID uint16 // ID of the ControlPingReply message @@ -116,18 +93,30 @@ type Connection struct { lastReply time.Time // time of the last received reply from VPP } +func newConnection(vpp adapter.VppAdapter) *Connection { + c := &Connection{ + vpp: vpp, + codec: &codec.MsgCodec{}, + msgIDs: make(map[string]uint16), + msgMap: make(map[uint16]api.Message), + channels: make(map[uint16]*Channel), + subscriptions: make(map[uint16][]*subscriptionCtx), + } + vpp.SetMsgCallback(c.msgCallback) + return c +} + // Connect connects to VPP using specified VPP adapter and returns the connection handle. // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { // create new connection handle - c, err := newConnection(vppAdapter) + c, err := createConnection(vppAdapter) if err != nil { return nil, err } // blocking attempt to connect to VPP - err = c.connectVPP() - if err != nil { + if err := c.connectVPP(); err != nil { return nil, err } @@ -140,13 +129,13 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { // create new connection handle - c, err := newConnection(vppAdapter) + c, err := createConnection(vppAdapter) if err != nil { return nil, nil, err } // asynchronously attempt to connect to VPP - connChan := make(chan ConnectionEvent, notificationChannelBufSize) + connChan := make(chan ConnectionEvent, NotificationChanBufSize) go c.connectLoop(connChan) return c, connChan, nil @@ -168,7 +157,7 @@ func (c *Connection) Disconnect() { } // newConnection returns new connection handle. -func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { +func createConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { connLock.Lock() defer connLock.Unlock() @@ -176,15 +165,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { return nil, errors.New("only one connection per process is supported") } - conn = &Connection{ - vpp: vppAdapter, - codec: &codec.MsgCodec{}, - channels: make(map[uint16]*channel), - msgIDs: make(map[string]uint16), - msgMap: make(map[uint16]api.Message), - notifSubscriptions: make(map[uint16][]*api.NotifSubscription), - } - conn.vpp.SetMsgCallback(conn.msgCallback) + conn = newConnection(vppAdapter) return conn, nil } @@ -211,8 +192,72 @@ func (c *Connection) connectVPP() error { return nil } -func getMsgNameWithCrc(x api.Message) string { - return x.GetMessageName() + "_" + x.GetCrcString() +func (c *Connection) NewAPIChannel() (api.Channel, error) { + return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize) +} + +func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { + return c.newAPIChannel(reqChanBufSize, replyChanBufSize) +} + +// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. +// It allows to specify custom buffer sizes for the request and reply Go channels. +func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + + // create new channel + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize) + + // store API channel within the client + c.channelsLock.Lock() + c.channels[chID] = channel + c.channelsLock.Unlock() + + // start watching on the request channel + go c.watchRequests(channel) + + return channel, nil +} + +// releaseAPIChannel releases API channel that needs to be closed. +func (c *Connection) releaseAPIChannel(ch *Channel) { + log.WithFields(logger.Fields{ + "channel": ch.id, + }).Debug("API channel released") + + // delete the channel from channels map + c.channelsLock.Lock() + delete(c.channels, ch.id) + c.channelsLock.Unlock() +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { + if c == nil { + return 0, errors.New("nil connection passed in") + } + + if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { + return msgID, nil + } + + return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString()) +} + +// LookupByID looks up message name and crc by ID. +func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + + if msg, ok := c.msgMap[msgID]; ok { + return msg, nil + } + + return nil, fmt.Errorf("unknown message ID: %d", msgID) } // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map @@ -268,32 +313,6 @@ func (c *Connection) retrieveMessageIDs() (err error) { return nil } -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - - if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { - return msgID, nil - } - - return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString()) -} - -// LookupByID looks up message name and crc by ID. -func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - - if msg, ok := c.msgMap[msgID]; ok { - return msg, nil - } - - return nil, fmt.Errorf("unknown message ID: %d", msgID) -} - // disconnectVPP disconnects from VPP in case it is connected. func (c *Connection) disconnectVPP() { if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { @@ -341,7 +360,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { // send health check probes until an error or timeout occurs for { // sleep until next health check probe period - time.Sleep(healthCheckInterval) + time.Sleep(HealthCheckProbeInterval) if atomic.LoadUint32(&c.connected) == 0 { // Disconnect has been called in the meantime, return the healthcheck - reconnect loop @@ -365,7 +384,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { case vppReply := <-ch.replyChan: err = vppReply.err - case <-time.After(healthCheckReplyTimeout): + case <-time.After(HealthCheckReplyTimeout): err = ErrProbeTimeout // check if time since last reply from any other @@ -374,7 +393,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { sinceLastReply = time.Since(c.lastReply) c.lastReplyLock.Unlock() - if sinceLastReply < healthCheckReplyTimeout { + if sinceLastReply < HealthCheckReplyTimeout { log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) continue } @@ -384,10 +403,10 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { if err == ErrProbeTimeout { failedChecks++ - log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks) - if failedChecks > healthCheckThreshold { + log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks) + if failedChecks > HealthCheckThreshold { // in case of exceeded failed check treshold, assume VPP disconnected - log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold) + log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold) connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} break } @@ -411,52 +430,6 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { c.connectLoop(connChan) } -func (c *Connection) NewAPIChannel() (api.Channel, error) { - return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize) -} - -func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { - return c.newAPIChannel(reqChanBufSize, replyChanBufSize) -} - -// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. -// It allows to specify custom buffer sizes for the request and reply Go channels. -func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - ch := &channel{ - id: chID, - replyTimeout: defaultReplyTimeout, - msgDecoder: c.codec, - msgIdentifier: c, - reqChan: make(chan *vppRequest, reqChanBufSize), - replyChan: make(chan *vppReply, replyChanBufSize), - notifSubsChan: make(chan *subscriptionRequest, reqChanBufSize), - notifSubsReplyChan: make(chan error, replyChanBufSize), - } - - // store API channel within the client - c.channelsLock.Lock() - c.channels[chID] = ch - c.channelsLock.Unlock() - - // start watching on the request channel - go c.watchRequests(ch) - - return ch, nil -} - -// releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *channel) { - log.WithFields(logger.Fields{ - "channel": ch.id, - }).Debug("API channel released") - - // delete the channel from channels map - c.channelsLock.Lock() - delete(c.channels, ch.id) - c.channelsLock.Unlock() +func getMsgNameWithCrc(x api.Message) string { + return x.GetMessageName() + "_" + x.GetCrcString() } diff --git a/core/connection_test.go b/core/connection_test.go index 5c8c309..cea7d2d 100644 --- a/core/connection_test.go +++ b/core/connection_test.go @@ -14,18 +14,16 @@ package core_test -/* import ( "testing" "git.fd.io/govpp.git/adapter/mock" "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/codec" "git.fd.io/govpp.git/core" - "git.fd.io/govpp.git/core/bin_api/vpe" "git.fd.io/govpp.git/examples/bin_api/interfaces" - "git.fd.io/govpp.git/examples/bin_api/stats" - - "git.fd.io/govpp.git/codec" + "git.fd.io/govpp.git/examples/binapi/stats" + "git.fd.io/govpp.git/examples/binapi/vpe" . "github.com/onsi/gomega" ) @@ -38,8 +36,9 @@ type testCtx struct { func setupTest(t *testing.T, bufferedChan bool) *testCtx { RegisterTestingT(t) - ctx := &testCtx{} - ctx.mockVpp = &mock.VppAdapter{} + ctx := &testCtx{ + mockVpp: mock.NewVppAdapter(), + } var err error ctx.conn, err = core.Connect(ctx.mockVpp) @@ -60,100 +59,6 @@ func (ctx *testCtx) teardownTest() { ctx.conn.Disconnect() } -func TestSimpleRequest(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}) - - req := &vpe.ControlPing{} - reply := &vpe.ControlPingReply{} - - // send the request and receive a reply - ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: req} - vppReply := <-ctx.ch.GetReplyChannel() - - Expect(vppReply).ShouldNot(BeNil()) - Expect(vppReply.Error).ShouldNot(HaveOccurred()) - - // decode the message - err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) - Expect(err).ShouldNot(HaveOccurred()) - - Expect(reply.Retval).To(BeEquivalentTo(-5)) -} - -func TestMultiRequest(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - msgs := []api.Message{} - for m := 0; m < 10; m++ { - msgs = append(msgs, &interfaces.SwInterfaceDetails{}) - } - ctx.mockVpp.MockReply(msgs...) - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - - // send multipart request - ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} - - cnt := 0 - for { - // receive a reply - vppReply := <-ctx.ch.GetReplyChannel() - if vppReply.LastReplyReceived { - break // break out of the loop - } - Expect(vppReply.Error).ShouldNot(HaveOccurred()) - - // decode the message - reply := &interfaces.SwInterfaceDetails{} - err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) - Expect(err).ShouldNot(HaveOccurred()) - cnt++ - } - - Expect(cnt).To(BeEquivalentTo(10)) -} - -func TestNotifications(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - // subscribe for notification - notifChan := make(chan api.Message, 1) - subscription := &api.NotifSubscription{ - NotifChan: notifChan, - MsgFactory: interfaces.NewSwInterfaceSetFlags, - } - ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: true, - } - err := <-ctx.ch.GetNotificationReplyChannel() - Expect(err).ShouldNot(HaveOccurred()) - - // mock the notification and force its delivery - ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ - SwIfIndex: 3, - AdminUpDown: 1, - }) - ctx.mockVpp.SendMsg(0, []byte{0}) - - // receive the notification - notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) - - Expect(notif.SwIfIndex).To(BeEquivalentTo(3)) - - // unsubscribe notification - ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: false, - } - err = <-ctx.ch.GetNotificationReplyChannel() - Expect(err).ShouldNot(HaveOccurred()) -} - func TestNilConnection(t *testing.T) { RegisterTestingT(t) var conn *core.Connection @@ -184,47 +89,16 @@ func TestAsyncConnection(t *testing.T) { defer ctx.teardownTest() ctx.conn.Disconnect() - conn, ch, err := core.AsyncConnect(ctx.mockVpp) + conn, statusChan, err := core.AsyncConnect(ctx.mockVpp) ctx.conn = conn Expect(err).ShouldNot(HaveOccurred()) Expect(conn).ShouldNot(BeNil()) - ev := <-ch + ev := <-statusChan Expect(ev.State).Should(BeEquivalentTo(core.Connected)) } -func TestFullBuffer(t *testing.T) { - ctx := setupTest(t, false) - defer ctx.teardownTest() - - // close the default API channel - ctx.ch.Close() - - // create a new channel with limited buffer sizes - var err error - ctx.ch, err = ctx.conn.NewAPIChannelBuffered(10, 1) - Expect(err).ShouldNot(HaveOccurred()) - - // send multiple requests, only one reply should be read - for i := 0; i < 20; i++ { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &vpe.ControlPing{}} - } - - vppReply := <-ctx.ch.GetReplyChannel() - Expect(vppReply).ShouldNot(BeNil()) - - var received bool - select { - case <-ctx.ch.GetReplyChannel(): - received = true // this should not happen - default: - received = false // no reply to be received - } - Expect(received).Should(BeFalse(), "A reply has been recieved, should had been ignored.") -} - func TestCodec(t *testing.T) { RegisterTestingT(t) @@ -289,7 +163,7 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { var reqCtx []api.RequestCtx for i := 0; i < 10; i++ { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) req := &vpe.ControlPing{} reqCtx = append(reqCtx, ctx.ch.SendRequest(req)) } @@ -298,7 +172,6 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { reply := &vpe.ControlPingReply{} err := reqCtx[i].ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(i)) } } @@ -306,7 +179,7 @@ func TestMultiRequestsWithSequenceNumbers(t *testing.T) { ctx := setupTest(t, false) defer ctx.teardownTest() - msgs := []api.Message{} + var msgs []api.Message for i := 0; i < 10; i++ { msgs = append(msgs, &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}) } @@ -325,7 +198,7 @@ func TestMultiRequestsWithSequenceNumbers(t *testing.T) { lastReplyReceived, err := reqCtx.ReceiveReply(reply) if lastReplyReceived { - break // break out of the loop + break } Expect(err).ShouldNot(HaveOccurred()) @@ -343,7 +216,7 @@ func TestSimpleRequestWithTimeout(t *testing.T) { // reply for a previous timeouted requests to be ignored ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, + Msg: &vpe.ControlPingReply{}, SeqNum: 0, }) @@ -359,12 +232,12 @@ func TestSimpleRequestWithTimeout(t *testing.T) { ctx.mockVpp.MockReplyWithContext( // reply for the previous request mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, + Msg: &vpe.ControlPingReply{}, SeqNum: 1, }, // reply for the next request mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 2}, + Msg: &vpe.ControlPingReply{}, SeqNum: 2, }) @@ -376,7 +249,6 @@ func TestSimpleRequestWithTimeout(t *testing.T) { reply = &vpe.ControlPingReply{} err = reqCtx2.ReceiveReply(reply) Expect(err).To(BeNil()) - Expect(reply.Retval).To(BeEquivalentTo(2)) } func TestSimpleRequestsWithMissingReply(t *testing.T) { @@ -393,7 +265,7 @@ func TestSimpleRequestsWithMissingReply(t *testing.T) { // third request with reply ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 3}, + Msg: &vpe.ControlPingReply{}, SeqNum: 3, }) req3 := &vpe.ControlPing{} @@ -414,7 +286,6 @@ func TestSimpleRequestsWithMissingReply(t *testing.T) { reply = &vpe.ControlPingReply{} err = reqCtx3.ReceiveReply(reply) Expect(err).To(BeNil()) - Expect(reply.Retval).To(BeEquivalentTo(3)) } func TestMultiRequestsWithErrors(t *testing.T) { @@ -422,38 +293,25 @@ func TestMultiRequestsWithErrors(t *testing.T) { defer ctx.teardownTest() // replies for a previous timeouted requests to be ignored - msgs := []mock.MsgWithContext{} - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0xffff - 1, - }, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0xffff, - }, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 1}, - SeqNum: 0, - }) - + msgs := []mock.MsgWithContext{ + {Msg: &vpe.ControlPingReply{}, SeqNum: 0xffff - 1}, + {Msg: &vpe.ControlPingReply{}, SeqNum: 0xffff}, + {Msg: &vpe.ControlPingReply{}, SeqNum: 0}, + } for i := 0; i < 10; i++ { - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, - SeqNum: 1, - Multipart: true, - }) + msgs = append(msgs, mock.MsgWithContext{ + Msg: &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, + SeqNum: 1, + Multipart: true, + }) } // missing finalizing control ping // reply for a next request - msgs = append(msgs, - mock.MsgWithContext{ - Msg: &vpe.ControlPingReply{Retval: 2}, - SeqNum: 2, - Multipart: false, - }) + msgs = append(msgs, mock.MsgWithContext{ + Msg: &vpe.ControlPingReply{}, + SeqNum: 2, + }) // queue replies ctx.mockVpp.MockReplyWithContext(msgs...) @@ -487,7 +345,6 @@ func TestMultiRequestsWithErrors(t *testing.T) { reply2 := &vpe.ControlPingReply{} err = reqCtx2.ReceiveReply(reply2) Expect(err).To(BeNil()) - Expect(reply2.Retval).To(BeEquivalentTo(2)) } func TestRequestsOrdering(t *testing.T) { @@ -498,12 +355,12 @@ func TestRequestsOrdering(t *testing.T) { // some replies will get thrown away // first request - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) req1 := &vpe.ControlPing{} reqCtx1 := ctx.ch.SendRequest(req1) // second request - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) req2 := &vpe.ControlPing{} reqCtx2 := ctx.ch.SendRequest(req2) @@ -512,7 +369,6 @@ func TestRequestsOrdering(t *testing.T) { reply2 := &vpe.ControlPingReply{} err := reqCtx2.ReceiveReply(reply2) Expect(err).To(BeNil()) - Expect(reply2.Retval).To(BeEquivalentTo(2)) // first request has already been considered closed reply1 := &vpe.ControlPingReply{} @@ -522,7 +378,7 @@ func TestRequestsOrdering(t *testing.T) { } func TestCycleOverSetOfSequenceNumbers(t *testing.T) { - ctx := setupTest(t, true) + ctx := setupTest(t, false) defer ctx.teardownTest() numIters := 0xffff + 100 @@ -530,7 +386,7 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) { for i := 0; i < numIters+30; i++ { if i < numIters { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) req := &vpe.ControlPing{} reqCtx[i] = ctx.ch.SendRequest(req) } @@ -538,8 +394,6 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) { reply := &vpe.ControlPingReply{} err := reqCtx[i-30].ReceiveReply(reply) Expect(err).ShouldNot(HaveOccurred()) - Expect(reply.Retval).To(BeEquivalentTo(i - 30)) } } } -*/ diff --git a/core/notification_handler.go b/core/notification_handler.go deleted file mode 100644 index 7b889e3..0000000 --- a/core/notification_handler.go +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "fmt" - - "git.fd.io/govpp.git/api" - logger "github.com/sirupsen/logrus" -) - -// processSubscriptionRequest processes a notification subscribe request. -func (c *Connection) processSubscriptionRequest(ch *channel, req *subscriptionRequest) error { - var err error - - // subscribe / unsubscribe - if req.subscribe { - err = c.addNotifSubscription(req.sub) - } else { - err = c.removeNotifSubscription(req.sub) - } - - // send the reply into the go channel - select { - case ch.notifSubsReplyChan <- err: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch.id, - }).Warn("Unable to deliver the subscribe reply, reciever end not ready.") - } - - return err -} - -// addNotifSubscription adds the notification subscription into the subscriptions map of the connection. -func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, msgName, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_id": msgID, - }).Debug("Adding new notification subscription.") - - // add the subscription into map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs) - - return nil -} - -// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection. -func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, msgName, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_id": msgID, - }).Debug("Removing notification subscription.") - - // remove the subscription from the map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - for i, item := range c.notifSubscriptions[msgID] { - if item == subs { - // remove i-th item in the slice - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...) - break - } - } - - return nil -} - -// isNotificationMessage returns true if someone has subscribed to provided message ID. -func (c *Connection) isNotificationMessage(msgID uint16) bool { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - _, exists := c.notifSubscriptions[msgID] - return exists -} - -// sendNotifications send a notification message to all subscribers subscribed for that message. -func (c *Connection) sendNotifications(msgID uint16, data []byte) { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - matched := false - - // send to notification to each subscriber - for _, subs := range c.notifSubscriptions[msgID] { - msg := subs.MsgFactory() - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Sending a notification to the subscription channel.") - - if err := c.codec.DecodeMsg(data, msg); err != nil { - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_id": msgID, - "msg_size": len(data), - }).Errorf("Unable to decode the notification message: %v", err) - continue - } - - // send the message into the go channel of the subscription - select { - case subs.NotifChan <- msg: - // message sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_id": msgID, - "msg_size": len(data), - }).Warn("Unable to deliver the notification, reciever end not ready.") - } - - matched = true - } - - if !matched { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - }).Info("No subscription found for the notification message.") - } -} - -// getSubscriptionMessageID returns ID of the message the subscription is tied to. -func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, string, error) { - msg := subs.MsgFactory() - msgID, err := c.GetMessageID(msg) - if err != nil { - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_crc": msg.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - return 0, "", fmt.Errorf("unable to retrieve message ID: %v", err) - } - - return msgID, msg.GetMessageName(), nil -} diff --git a/core/request_handler.go b/core/request_handler.go index fd6d100..14c095d 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -29,7 +29,7 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *channel) { +func (c *Connection) watchRequests(ch *Channel) { for { select { case req, ok := <-ch.reqChan: @@ -40,16 +40,12 @@ func (c *Connection) watchRequests(ch *channel) { return } c.processRequest(ch, req) - - case req := <-ch.notifSubsChan: - // new request on the notification subscribe channel - c.processSubscriptionRequest(ch, req) } } } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *channel, req *vppRequest) error { +func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected @@ -137,7 +133,7 @@ func (c *Connection) processRequest(ch *channel, req *vppRequest) error { } // msgCallback is called whenever any binary API message comes from VPP. -func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { +func (c *Connection) msgCallback(msgID uint16, data []byte) { connLock.RLock() defer connLock.RUnlock() @@ -157,13 +153,8 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { // - replies that don't have context as first field (comes as zero) // - events that don't have context at all (comes as non zero) // - msgContext, err := c.codec.DecodeMsgContext(data, msg) - if err == nil { - if context != msgContext { - log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext) - context = msgContext - } - } else { + context, err := c.codec.DecodeMsgContext(data, msg) + if err != nil { log.Errorf("decoding context failed: %v", err) } @@ -218,7 +209,7 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. -func sendReply(ch *channel, reply *vppReply) { +func sendReply(ch *Channel, reply *vppReply) { select { case ch.replyChan <- reply: // reply sent successfully @@ -232,10 +223,68 @@ func sendReply(ch *channel, reply *vppReply) { } } -func sendReplyError(ch *channel, req *vppRequest, err error) { +func sendReplyError(ch *Channel, req *vppRequest, err error) { sendReply(ch, &vppReply{seqNum: req.seqNum, err: err}) } +// isNotificationMessage returns true if someone has subscribed to provided message ID. +func (c *Connection) isNotificationMessage(msgID uint16) bool { + c.subscriptionsLock.RLock() + defer c.subscriptionsLock.RUnlock() + + _, exists := c.subscriptions[msgID] + return exists +} + +// sendNotifications send a notification message to all subscribers subscribed for that message. +func (c *Connection) sendNotifications(msgID uint16, data []byte) { + c.subscriptionsLock.RLock() + defer c.subscriptionsLock.RUnlock() + + matched := false + + // send to notification to each subscriber + for _, sub := range c.subscriptions[msgID] { + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Sending a notification to the subscription channel.") + + event := sub.msgFactory() + if err := c.codec.DecodeMsg(data, event); err != nil { + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Errorf("Unable to decode the notification message: %v", err) + continue + } + + // send the message into the go channel of the subscription + select { + case sub.notifChan <- event: + // message sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Warn("Unable to deliver the notification, reciever end not ready.") + } + + matched = true + } + + if !matched { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + }).Info("No subscription found for the notification message.") + } +} + // +------------------+-------------------+-----------------------+ // | 15b = channel ID | 1b = is multipart | 16b = sequence number | // +------------------+-------------------+-----------------------+ |