diff options
author | Milan Lenco <milan.lenco@pantheon.tech> | 2018-06-25 20:31:11 +0200 |
---|---|---|
committer | Milan Lenco <milan.lenco@pantheon.tech> | 2018-06-26 15:07:55 +0200 |
commit | 8adb6cdcb496f05169263d32a857791faf8baee1 (patch) | |
tree | 12bb4fbce0af84326c1a6d80b76a71ad097fdf7d /core | |
parent | e44d8c3905e22f940a100e6331a45412cba9d47e (diff) |
Pair requests with replies using sequence numbers
Requests are given sequence numbers (cycling over a finite set of 2^16
integers) that are stored into the lower 16bits of the context.
1bit is also allocated for isMultipart boolean flag and the remaining
15bits are used to store the channel ID. The sequence numbers allow
to reliably pair replies with requests, even in scenarious with timeouted
requests or ignored (unread) replies.
Sequencing is not used with asynchronous messaging as it is implemented
by methods of the Channel structure, i.e. above ReqChan and ReplyChan
channels.
Change-Id: I7ca0e8489c7ffcc388c3cfef6d05c02f9500931c
Signed-off-by: Milan Lenco <milan.lenco@pantheon.tech>
Diffstat (limited to 'core')
-rw-r--r-- | core/core.go | 25 | ||||
-rw-r--r-- | core/core_test.go | 252 | ||||
-rw-r--r-- | core/request_handler.go | 87 |
3 files changed, 304 insertions, 60 deletions
diff --git a/core/core.go b/core/core.go index 4782ba1..052eb0b 100644 --- a/core/core.go +++ b/core/core.go @@ -77,12 +77,12 @@ type Connection struct { msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC channelsLock sync.RWMutex // lock for the channels map - channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID + channels map[uint16]*api.Channel // map of all API channels indexed by the channel ID notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID - maxChannelID uint32 // maximum used client ID + maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) pingReqID uint16 // ID if the ControlPing message pingReplyID uint16 // ID of the ControlPingReply message @@ -90,12 +90,6 @@ type Connection struct { lastReply time.Time // time of the last received reply from VPP } -// channelMetadata contains core-local metadata of an API channel. -type channelMetadata struct { - id uint32 // channel ID - multipart uint32 // 1 if multipart request is being processed, 0 otherwise -} - var ( log *logger.Logger // global logger conn *Connection // global handle to the Connection (used in the message receive callback) @@ -204,7 +198,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { conn = &Connection{ vpp: vppAdapter, codec: &MsgCodec{}, - channels: make(map[uint32]*api.Channel), + channels: make(map[uint16]*api.Channel), msgIDs: make(map[string]uint16), notifSubscriptions: make(map[uint16][]*api.NotifSubscription), } @@ -370,10 +364,9 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) if c == nil { return nil, errors.New("nil connection passed in") } - chID := atomic.AddUint32(&c.maxChannelID, 1) - chMeta := &channelMetadata{id: chID} - ch := api.NewChannelInternal(chMeta) + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + ch := api.NewChannelInternal(chID) ch.MsgDecoder = c.codec ch.MsgIdentifier = c @@ -389,19 +382,19 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) c.channelsLock.Unlock() // start watching on the request channel - go c.watchRequests(ch, chMeta) + go c.watchRequests(ch) return ch, nil } // releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) { +func (c *Connection) releaseAPIChannel(ch *api.Channel) { log.WithFields(logger.Fields{ - "context": chMeta.id, + "ID": ch.ID, }).Debug("API channel closed.") // delete the channel from channels map c.channelsLock.Lock() - delete(c.channels, chMeta.id) + delete(c.channels, ch.ID) c.channelsLock.Unlock() } diff --git a/core/core_test.go b/core/core_test.go index e80f97c..981ff19 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -33,7 +33,7 @@ type testCtx struct { ch *api.Channel } -func setupTest(t *testing.T) *testCtx { +func setupTest(t *testing.T, bufferedChan bool) *testCtx { RegisterTestingT(t) ctx := &testCtx{} @@ -43,7 +43,11 @@ func setupTest(t *testing.T) *testCtx { ctx.conn, err = core.Connect(ctx.mockVpp) Expect(err).ShouldNot(HaveOccurred()) - ctx.ch, err = ctx.conn.NewAPIChannel() + if bufferedChan { + ctx.ch, err = ctx.conn.NewAPIChannelBuffered(100, 100) + } else { + ctx.ch, err = ctx.conn.NewAPIChannel() + } Expect(err).ShouldNot(HaveOccurred()) return ctx @@ -55,10 +59,10 @@ func (ctx *testCtx) teardownTest() { } func TestSimpleRequest(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}, false) req := &vpe.ControlPing{} reply := &vpe.ControlPingReply{} @@ -78,13 +82,13 @@ func TestSimpleRequest(t *testing.T) { } func TestMultiRequest(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() for m := 0; m < 10; m++ { - ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{}) + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{}, true) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) // send multipart request ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} @@ -109,7 +113,7 @@ func TestMultiRequest(t *testing.T) { } func TestNotifications(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() // subscribe for notification @@ -129,7 +133,7 @@ func TestNotifications(t *testing.T) { ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ SwIfIndex: 3, AdminUpDown: 1, - }) + }, false) ctx.mockVpp.SendMsg(0, []byte{0}) // receive the notification @@ -162,7 +166,7 @@ func TestNilConnection(t *testing.T) { } func TestDoubleConnection(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() conn, err := core.Connect(ctx.mockVpp) @@ -172,7 +176,7 @@ func TestDoubleConnection(t *testing.T) { } func TestAsyncConnection(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() ctx.conn.Disconnect() @@ -187,7 +191,7 @@ func TestAsyncConnection(t *testing.T) { } func TestFullBuffer(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() // close the default API channel @@ -200,7 +204,7 @@ func TestFullBuffer(t *testing.T) { // send multiple requests, only one reply should be read for i := 0; i < 20; i++ { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false) ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} } @@ -274,3 +278,225 @@ func TestCodecNegative(t *testing.T) { Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("EOF")) } + +func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + var reqCtx []*api.RequestCtx + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}, false) + req := &vpe.ControlPing{} + reqCtx = append(reqCtx, ctx.ch.SendRequest(req)) + } + + for i := 0; i < 10; i++ { + reply := &vpe.ControlPingReply{} + err := reqCtx[i].ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(i)) + } +} + +func TestMultiRequestsWithSequenceNumbers(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true) + } + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) + + // send multipart request + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + + cnt := 0 + for { + Expect(cnt < 11).To(BeTrue()) + + // receive a reply + reply := &interfaces.SwInterfaceDetails{} + lastReplyReceived, err := reqCtx.ReceiveReply(reply) + + if lastReplyReceived { + break // break out of the loop + } + + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(cnt)) + + cnt++ + } + + Expect(cnt).To(BeEquivalentTo(10)) +} + +func TestSimpleRequestWithTimeout(t *testing.T) { + ctx := setupTest(t, true) + defer ctx.teardownTest() + + // reply for a previous timeouted requests to be ignored + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0) + + // send reply later + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + reply := &vpe.ControlPingReply{} + err := reqCtx1.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) + + // reply for the previous request + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,1) + + // next request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false) + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // second request should ignore the first reply and return the second one + reply = &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply) + Expect(err).To(BeNil()) + Expect(reply.Retval).To(BeEquivalentTo(2)) +} + +func TestSimpleRequestsWithMissingReply(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // request without reply + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + // another request without reply + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // third request with reply + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 3}, false, 3) + req3 := &vpe.ControlPing{} + reqCtx3 := ctx.ch.SendRequest(req3) + + // the first two should fail, but not consume reply for the 3rd + reply := &vpe.ControlPingReply{} + err := reqCtx1.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + reply = &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 2")) + + // the second request should succeed + reply = &vpe.ControlPingReply{} + err = reqCtx3.ReceiveReply(reply) + Expect(err).To(BeNil()) + Expect(reply.Retval).To(BeEquivalentTo(3)) +} + +func TestMultiRequestsWithErrors(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // reply for a previous timeouted requests to be ignored + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff - 1) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0) + + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true) + } + // missing finalizing control ping + + // reply for a next request + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 2}, false,2) + + // send multipart request + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + + for i := 0; i < 10; i++ { + // receive multi-part replies + reply := &interfaces.SwInterfaceDetails{} + lastReplyReceived, err := reqCtx.ReceiveReply(reply) + + Expect(lastReplyReceived).To(BeFalse()) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(i)) + } + + // missing closing control ping + reply := &interfaces.SwInterfaceDetails{} + _, err := reqCtx.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + // try again - still fails and nothing consumed + _, err = reqCtx.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + // reply for the second request has not been consumed + reqCtx2 := ctx.ch.SendRequest(&vpe.ControlPing{}) + reply2 := &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply2) + Expect(err).To(BeNil()) + Expect(reply2.Retval).To(BeEquivalentTo(2)) +} + +func TestRequestsOrdering(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // the orderings of SendRequest and ReceiveReply calls should match, otherwise + // some replies will get thrown away + + // first request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1}, false) + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + // second request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false) + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // if reply for the second request is read first, the reply for the first + // request gets thrown away. + reply2 := &vpe.ControlPingReply{} + err := reqCtx2.ReceiveReply(reply2) + Expect(err).To(BeNil()) + Expect(reply2.Retval).To(BeEquivalentTo(2)) + + // first request has already been considered closed + reply1 := &vpe.ControlPingReply{} + err = reqCtx1.ReceiveReply(reply1) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) +} + +func TestCycleOverSetOfSequenceNumbers(t *testing.T) { + ctx := setupTest(t, true) + defer ctx.teardownTest() + + numIters := 0xffff + 100 + reqCtx := make(map[int]*api.RequestCtx) + + var seqNum uint16 = 0 + for i := 0; i < numIters + 30 /* receiver is 30 reqs behind */; i++ { + seqNum++ + if i < numIters { + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: int32(i)}, false, seqNum) + req := &vpe.ControlPing{} + reqCtx[i] = ctx.ch.SendRequest(req) + } + if i > 30 { + reply := &vpe.ControlPingReply{} + err := reqCtx[i-30].ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(i-30)) + } + } +}
\ No newline at end of file diff --git a/core/request_handler.go b/core/request_handler.go index 8f793f5..3bec38d 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -31,17 +31,17 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { +func (c *Connection) watchRequests(ch *api.Channel) { for { select { case req, ok := <-ch.ReqChan: // new request on the request channel if !ok { // after closing the request channel, release API channel and return - c.releaseAPIChannel(ch, chMeta) + c.releaseAPIChannel(ch) return } - c.processRequest(ch, chMeta, req) + c.processRequest(ch, req) case req := <-ch.NotifSubsChan: // new request on the notification subscribe channel @@ -51,12 +51,12 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { +func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected log.Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -67,8 +67,9 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re log.WithFields(logger.Fields{ "msg_name": req.Message.GetMessageName(), "msg_crc": req.Message.GetCrcString(), + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -77,37 +78,35 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re if err != nil { err = fmt.Errorf("unable to encode the messge: %v", err) log.WithFields(logger.Fields{ - "context": chMeta.id, + "channel": ch.ID, "msg_id": msgID, + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "context": chMeta.id, + "channel": ch.ID, "msg_id": msgID, "msg_size": len(data), "msg_name": req.Message.GetMessageName(), + "seq_num": req.SeqNum, }).Debug("Sending a message to VPP.") } - // send the message - if req.Multipart { - // expect multipart response - atomic.StoreUint32(&chMeta.multipart, 1) - } - // send the request to VPP - err = c.vpp.SendMsg(chMeta.id, data) + context := packRequestContext(ch.ID, req.Multipart, req.SeqNum) + err = c.vpp.SendMsg(context, data) if err != nil { - err = fmt.Errorf("unable to send the messge: %v", err) + err = fmt.Errorf("unable to send the message: %v", err) log.WithFields(logger.Fields{ - "context": chMeta.id, + "context": context, "msg_id": msgID, + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -116,12 +115,13 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID) log.WithFields(logger.Fields{ - "context": chMeta.id, + "context": context, "msg_id": c.pingReqID, "msg_size": len(pingData), + "seq_num": req.SeqNum, }).Debug("Sending a control ping to VPP.") - c.vpp.SendMsg(chMeta.id, pingData) + c.vpp.SendMsg(context, pingData) } return nil @@ -137,11 +137,14 @@ func msgCallback(context uint32, msgID uint16, data []byte) { return } + chanID, isMultipart, seqNum := unpackRequestContext(context) if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "msg_size": len(data), + "msg_id": msgID, + "msg_size": len(data), + "channel_id": chanID, + "is_multipart": isMultipart, + "seq_num": seqNum, }).Debug("Received a message from VPP.") } @@ -153,27 +156,27 @@ func msgCallback(context uint32, msgID uint16, data []byte) { // match ch according to the context conn.channelsLock.RLock() - ch, ok := conn.channels[context] + ch, ok := conn.channels[chanID] conn.channelsLock.RUnlock() if !ok { log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - }).Error("Context ID not known, ignoring the message.") + "channel_id": chanID, + "msg_id": msgID, + }).Error("Channel ID not known, ignoring the message.") return } - chMeta := ch.Metadata().(*channelMetadata) lastReplyReceived := false - // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply - if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { + // if this is a control ping reply to a multipart request, treat this as a last part of the reply + if msgID == conn.pingReplyID && isMultipart { lastReplyReceived = true } // send the data to the channel sendReply(ch, &api.VppReply{ MessageID: msgID, + SeqNum: seqNum, Data: data, LastReplyReceived: lastReplyReceived, }) @@ -195,6 +198,7 @@ func sendReply(ch *api.Channel, reply *api.VppReply) { log.WithFields(logger.Fields{ "channel": ch, "msg_id": reply.MessageID, + "seq_num": reply.SeqNum, }).Warn("Unable to send the reply, reciever end not ready.") } } @@ -254,3 +258,24 @@ func (c *Connection) LookupByID(ID uint16) (string, error) { return "", fmt.Errorf("unknown message ID: %d", ID) } + +// +------------------+-------------------+-----------------------+ +// | 15b = channel ID | 1b = is multipart | 16b = sequence number | +// +------------------+-------------------+-----------------------+ +func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 { + context := uint32(chanID) << 17 + if isMultipart { + context |= 1 << 16 + } + context |= uint32(seqNum) + return context +} + +func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) { + chanID = uint16(context >> 17) + if ((context >> 16) & 0x1) != 0 { + isMulipart = true + } + seqNum = uint16(context & 0xffff) + return +} |