diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/core.go | 25 | ||||
-rw-r--r-- | core/core_test.go | 252 | ||||
-rw-r--r-- | core/request_handler.go | 87 |
3 files changed, 304 insertions, 60 deletions
diff --git a/core/core.go b/core/core.go index 4782ba1..052eb0b 100644 --- a/core/core.go +++ b/core/core.go @@ -77,12 +77,12 @@ type Connection struct { msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC channelsLock sync.RWMutex // lock for the channels map - channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID + channels map[uint16]*api.Channel // map of all API channels indexed by the channel ID notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID - maxChannelID uint32 // maximum used client ID + maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) pingReqID uint16 // ID if the ControlPing message pingReplyID uint16 // ID of the ControlPingReply message @@ -90,12 +90,6 @@ type Connection struct { lastReply time.Time // time of the last received reply from VPP } -// channelMetadata contains core-local metadata of an API channel. -type channelMetadata struct { - id uint32 // channel ID - multipart uint32 // 1 if multipart request is being processed, 0 otherwise -} - var ( log *logger.Logger // global logger conn *Connection // global handle to the Connection (used in the message receive callback) @@ -204,7 +198,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { conn = &Connection{ vpp: vppAdapter, codec: &MsgCodec{}, - channels: make(map[uint32]*api.Channel), + channels: make(map[uint16]*api.Channel), msgIDs: make(map[string]uint16), notifSubscriptions: make(map[uint16][]*api.NotifSubscription), } @@ -370,10 +364,9 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) if c == nil { return nil, errors.New("nil connection passed in") } - chID := atomic.AddUint32(&c.maxChannelID, 1) - chMeta := &channelMetadata{id: chID} - ch := api.NewChannelInternal(chMeta) + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + ch := api.NewChannelInternal(chID) ch.MsgDecoder = c.codec ch.MsgIdentifier = c @@ -389,19 +382,19 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) c.channelsLock.Unlock() // start watching on the request channel - go c.watchRequests(ch, chMeta) + go c.watchRequests(ch) return ch, nil } // releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) { +func (c *Connection) releaseAPIChannel(ch *api.Channel) { log.WithFields(logger.Fields{ - "context": chMeta.id, + "ID": ch.ID, }).Debug("API channel closed.") // delete the channel from channels map c.channelsLock.Lock() - delete(c.channels, chMeta.id) + delete(c.channels, ch.ID) c.channelsLock.Unlock() } diff --git a/core/core_test.go b/core/core_test.go index e80f97c..981ff19 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -33,7 +33,7 @@ type testCtx struct { ch *api.Channel } -func setupTest(t *testing.T) *testCtx { +func setupTest(t *testing.T, bufferedChan bool) *testCtx { RegisterTestingT(t) ctx := &testCtx{} @@ -43,7 +43,11 @@ func setupTest(t *testing.T) *testCtx { ctx.conn, err = core.Connect(ctx.mockVpp) Expect(err).ShouldNot(HaveOccurred()) - ctx.ch, err = ctx.conn.NewAPIChannel() + if bufferedChan { + ctx.ch, err = ctx.conn.NewAPIChannelBuffered(100, 100) + } else { + ctx.ch, err = ctx.conn.NewAPIChannel() + } Expect(err).ShouldNot(HaveOccurred()) return ctx @@ -55,10 +59,10 @@ func (ctx *testCtx) teardownTest() { } func TestSimpleRequest(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}, false) req := &vpe.ControlPing{} reply := &vpe.ControlPingReply{} @@ -78,13 +82,13 @@ func TestSimpleRequest(t *testing.T) { } func TestMultiRequest(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() for m := 0; m < 10; m++ { - ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{}) + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{}, true) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) // send multipart request ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} @@ -109,7 +113,7 @@ func TestMultiRequest(t *testing.T) { } func TestNotifications(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() // subscribe for notification @@ -129,7 +133,7 @@ func TestNotifications(t *testing.T) { ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ SwIfIndex: 3, AdminUpDown: 1, - }) + }, false) ctx.mockVpp.SendMsg(0, []byte{0}) // receive the notification @@ -162,7 +166,7 @@ func TestNilConnection(t *testing.T) { } func TestDoubleConnection(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() conn, err := core.Connect(ctx.mockVpp) @@ -172,7 +176,7 @@ func TestDoubleConnection(t *testing.T) { } func TestAsyncConnection(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() ctx.conn.Disconnect() @@ -187,7 +191,7 @@ func TestAsyncConnection(t *testing.T) { } func TestFullBuffer(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() // close the default API channel @@ -200,7 +204,7 @@ func TestFullBuffer(t *testing.T) { // send multiple requests, only one reply should be read for i := 0; i < 20; i++ { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false) ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} } @@ -274,3 +278,225 @@ func TestCodecNegative(t *testing.T) { Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("EOF")) } + +func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + var reqCtx []*api.RequestCtx + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}, false) + req := &vpe.ControlPing{} + reqCtx = append(reqCtx, ctx.ch.SendRequest(req)) + } + + for i := 0; i < 10; i++ { + reply := &vpe.ControlPingReply{} + err := reqCtx[i].ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(i)) + } +} + +func TestMultiRequestsWithSequenceNumbers(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true) + } + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) + + // send multipart request + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + + cnt := 0 + for { + Expect(cnt < 11).To(BeTrue()) + + // receive a reply + reply := &interfaces.SwInterfaceDetails{} + lastReplyReceived, err := reqCtx.ReceiveReply(reply) + + if lastReplyReceived { + break // break out of the loop + } + + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(cnt)) + + cnt++ + } + + Expect(cnt).To(BeEquivalentTo(10)) +} + +func TestSimpleRequestWithTimeout(t *testing.T) { + ctx := setupTest(t, true) + defer ctx.teardownTest() + + // reply for a previous timeouted requests to be ignored + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0) + + // send reply later + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + reply := &vpe.ControlPingReply{} + err := reqCtx1.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) + + // reply for the previous request + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,1) + + // next request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false) + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // second request should ignore the first reply and return the second one + reply = &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply) + Expect(err).To(BeNil()) + Expect(reply.Retval).To(BeEquivalentTo(2)) +} + +func TestSimpleRequestsWithMissingReply(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // request without reply + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + // another request without reply + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // third request with reply + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 3}, false, 3) + req3 := &vpe.ControlPing{} + reqCtx3 := ctx.ch.SendRequest(req3) + + // the first two should fail, but not consume reply for the 3rd + reply := &vpe.ControlPingReply{} + err := reqCtx1.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + reply = &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 2")) + + // the second request should succeed + reply = &vpe.ControlPingReply{} + err = reqCtx3.ReceiveReply(reply) + Expect(err).To(BeNil()) + Expect(reply.Retval).To(BeEquivalentTo(3)) +} + +func TestMultiRequestsWithErrors(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // reply for a previous timeouted requests to be ignored + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff - 1) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0) + + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true) + } + // missing finalizing control ping + + // reply for a next request + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 2}, false,2) + + // send multipart request + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + + for i := 0; i < 10; i++ { + // receive multi-part replies + reply := &interfaces.SwInterfaceDetails{} + lastReplyReceived, err := reqCtx.ReceiveReply(reply) + + Expect(lastReplyReceived).To(BeFalse()) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(i)) + } + + // missing closing control ping + reply := &interfaces.SwInterfaceDetails{} + _, err := reqCtx.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + // try again - still fails and nothing consumed + _, err = reqCtx.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + // reply for the second request has not been consumed + reqCtx2 := ctx.ch.SendRequest(&vpe.ControlPing{}) + reply2 := &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply2) + Expect(err).To(BeNil()) + Expect(reply2.Retval).To(BeEquivalentTo(2)) +} + +func TestRequestsOrdering(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // the orderings of SendRequest and ReceiveReply calls should match, otherwise + // some replies will get thrown away + + // first request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1}, false) + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + // second request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false) + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // if reply for the second request is read first, the reply for the first + // request gets thrown away. + reply2 := &vpe.ControlPingReply{} + err := reqCtx2.ReceiveReply(reply2) + Expect(err).To(BeNil()) + Expect(reply2.Retval).To(BeEquivalentTo(2)) + + // first request has already been considered closed + reply1 := &vpe.ControlPingReply{} + err = reqCtx1.ReceiveReply(reply1) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) +} + +func TestCycleOverSetOfSequenceNumbers(t *testing.T) { + ctx := setupTest(t, true) + defer ctx.teardownTest() + + numIters := 0xffff + 100 + reqCtx := make(map[int]*api.RequestCtx) + + var seqNum uint16 = 0 + for i := 0; i < numIters + 30 /* receiver is 30 reqs behind */; i++ { + seqNum++ + if i < numIters { + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: int32(i)}, false, seqNum) + req := &vpe.ControlPing{} + reqCtx[i] = ctx.ch.SendRequest(req) + } + if i > 30 { + reply := &vpe.ControlPingReply{} + err := reqCtx[i-30].ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(i-30)) + } + } +}
\ No newline at end of file diff --git a/core/request_handler.go b/core/request_handler.go index 8f793f5..3bec38d 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -31,17 +31,17 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { +func (c *Connection) watchRequests(ch *api.Channel) { for { select { case req, ok := <-ch.ReqChan: // new request on the request channel if !ok { // after closing the request channel, release API channel and return - c.releaseAPIChannel(ch, chMeta) + c.releaseAPIChannel(ch) return } - c.processRequest(ch, chMeta, req) + c.processRequest(ch, req) case req := <-ch.NotifSubsChan: // new request on the notification subscribe channel @@ -51,12 +51,12 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { +func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected log.Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -67,8 +67,9 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re log.WithFields(logger.Fields{ "msg_name": req.Message.GetMessageName(), "msg_crc": req.Message.GetCrcString(), + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -77,37 +78,35 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re if err != nil { err = fmt.Errorf("unable to encode the messge: %v", err) log.WithFields(logger.Fields{ - "context": chMeta.id, + "channel": ch.ID, "msg_id": msgID, + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "context": chMeta.id, + "channel": ch.ID, "msg_id": msgID, "msg_size": len(data), "msg_name": req.Message.GetMessageName(), + "seq_num": req.SeqNum, }).Debug("Sending a message to VPP.") } - // send the message - if req.Multipart { - // expect multipart response - atomic.StoreUint32(&chMeta.multipart, 1) - } - // send the request to VPP - err = c.vpp.SendMsg(chMeta.id, data) + context := packRequestContext(ch.ID, req.Multipart, req.SeqNum) + err = c.vpp.SendMsg(context, data) if err != nil { - err = fmt.Errorf("unable to send the messge: %v", err) + err = fmt.Errorf("unable to send the message: %v", err) log.WithFields(logger.Fields{ - "context": chMeta.id, + "context": context, "msg_id": msgID, + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -116,12 +115,13 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID) log.WithFields(logger.Fields{ - "context": chMeta.id, + "context": context, "msg_id": c.pingReqID, "msg_size": len(pingData), + "seq_num": req.SeqNum, }).Debug("Sending a control ping to VPP.") - c.vpp.SendMsg(chMeta.id, pingData) + c.vpp.SendMsg(context, pingData) } return nil @@ -137,11 +137,14 @@ func msgCallback(context uint32, msgID uint16, data []byte) { return } + chanID, isMultipart, seqNum := unpackRequestContext(context) if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "msg_size": len(data), + "msg_id": msgID, + "msg_size": len(data), + "channel_id": chanID, + "is_multipart": isMultipart, + "seq_num": seqNum, }).Debug("Received a message from VPP.") } @@ -153,27 +156,27 @@ func msgCallback(context uint32, msgID uint16, data []byte) { // match ch according to the context conn.channelsLock.RLock() - ch, ok := conn.channels[context] + ch, ok := conn.channels[chanID] conn.channelsLock.RUnlock() if !ok { log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - }).Error("Context ID not known, ignoring the message.") + "channel_id": chanID, + "msg_id": msgID, + }).Error("Channel ID not known, ignoring the message.") return } - chMeta := ch.Metadata().(*channelMetadata) lastReplyReceived := false - // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply - if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { + // if this is a control ping reply to a multipart request, treat this as a last part of the reply + if msgID == conn.pingReplyID && isMultipart { lastReplyReceived = true } // send the data to the channel sendReply(ch, &api.VppReply{ MessageID: msgID, + SeqNum: seqNum, Data: data, LastReplyReceived: lastReplyReceived, }) @@ -195,6 +198,7 @@ func sendReply(ch *api.Channel, reply *api.VppReply) { log.WithFields(logger.Fields{ "channel": ch, "msg_id": reply.MessageID, + "seq_num": reply.SeqNum, }).Warn("Unable to send the reply, reciever end not ready.") } } @@ -254,3 +258,24 @@ func (c *Connection) LookupByID(ID uint16) (string, error) { return "", fmt.Errorf("unknown message ID: %d", ID) } + +// +------------------+-------------------+-----------------------+ +// | 15b = channel ID | 1b = is multipart | 16b = sequence number | +// +------------------+-------------------+-----------------------+ +func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 { + context := uint32(chanID) << 17 + if isMultipart { + context |= 1 << 16 + } + context |= uint32(seqNum) + return context +} + +func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) { + chanID = uint16(context >> 17) + if ((context >> 16) & 0x1) != 0 { + isMulipart = true + } + seqNum = uint16(context & 0xffff) + return +} |