diff options
-rw-r--r-- | adapter/adapter.go | 2 | ||||
-rw-r--r-- | adapter/mock/mock_adapter.go | 73 | ||||
-rw-r--r-- | adapter/vppapiclient/vppapiclient_adapter.go | 4 | ||||
-rw-r--r-- | api/api.go | 171 | ||||
-rw-r--r-- | api/api_test.go | 64 | ||||
-rw-r--r-- | core/core.go | 25 | ||||
-rw-r--r-- | core/core_test.go | 252 | ||||
-rw-r--r-- | core/request_handler.go | 87 |
8 files changed, 508 insertions, 170 deletions
diff --git a/adapter/adapter.go b/adapter/adapter.go index a5b3352..bc3a573 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -26,7 +26,7 @@ type VppAdapter interface { GetMsgID(msgName string, msgCrc string) (uint16, error) // SendMsg sends a binary-encoded message to VPP. - SendMsg(clientID uint32, data []byte) error + SendMsg(context uint32, data []byte) error // SetMsgCallback sets a callback function that will be called by the adapter whenever a message comes from VPP. SetMsgCallback(func(context uint32, msgId uint16, data []byte)) diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go index dab51a6..ae4caf0 100644 --- a/adapter/mock/mock_adapter.go +++ b/adapter/mock/mock_adapter.go @@ -48,10 +48,10 @@ type VppAdapter struct { binAPITypes map[string]reflect.Type access sync.RWMutex - replies []api.Message // FIFO queue of messages - replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses - repliesLock sync.Mutex // mutex for the queue - mode replyMode // mode in which the mock operates + replies []reply // FIFO queue of messages + replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses + repliesLock sync.Mutex // mutex for the queue + mode replyMode // mode in which the mock operates } // defaultReply is a default reply message that mock adapter returns for a request. @@ -67,6 +67,13 @@ type MessageDTO struct { Data []byte } +type reply struct { + msg api.Message + multipart bool + hasSeqNum bool + seqNum uint16 +} + // ReplyHandler is a type that allows to extend the behaviour of VPP mock. // Return value ok is used to signalize that mock reply is calculated and ready to be used. type ReplyHandler func(request MessageDTO) (reply []byte, msgID uint16, ok bool) @@ -247,28 +254,35 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { defer a.repliesLock.Unlock() // pop all replies from queue + withSeqNums := false for i, reply := range a.replies { - if i > 0 && reply.GetMessageName() == "control_ping_reply" { + if i > 0 && reply.msg.GetMessageName() == "control_ping_reply" && !withSeqNums { // hack - do not send control_ping_reply immediately, leave it for the the next callback a.replies = a.replies[i:] return nil } - msgID, _ := a.GetMsgID(reply.GetMessageName(), reply.GetCrcString()) + msgID, _ := a.GetMsgID(reply.msg.GetMessageName(), reply.msg.GetCrcString()) buf := new(bytes.Buffer) - if reply.GetMessageType() == api.ReplyMessage { - struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: clientID}) - } else if reply.GetMessageType() == api.EventMessage { - struc.Pack(buf, &core.VppEventHeader{VlMsgID: msgID, Context: clientID}) - } else if reply.GetMessageType() == api.RequestMessage { - struc.Pack(buf, &core.VppRequestHeader{VlMsgID: msgID, Context: clientID}) + context := clientID + context = setMultipart(context, reply.multipart) + if reply.hasSeqNum { + withSeqNums = true + context = setSeqNum(context, reply.seqNum) + } + if reply.msg.GetMessageType() == api.ReplyMessage { + struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: context}) + } else if reply.msg.GetMessageType() == api.EventMessage { + struc.Pack(buf, &core.VppEventHeader{VlMsgID: msgID, Context: context}) + } else if reply.msg.GetMessageType() == api.RequestMessage { + struc.Pack(buf, &core.VppRequestHeader{VlMsgID: msgID, Context: context}) } else { struc.Pack(buf, &core.VppOtherHeader{VlMsgID: msgID}) } - struc.Pack(buf, reply) - a.callback(clientID, msgID, buf.Bytes()) + struc.Pack(buf, reply.msg) + a.callback(context, msgID, buf.Bytes()) } if len(a.replies) > 0 { - a.replies = []api.Message{} + a.replies = []reply{} if len(a.replyHandlers) > 0 { // Switch back to handlers once the queue is empty to revert back // the fallthrough effect. @@ -302,11 +316,21 @@ func (a *VppAdapter) WaitReady() error { // MockReply stores a message to be returned when the next request comes. It is a FIFO queue - multiple replies // can be pushed into it, the first one will be popped when some request comes. // Using of this method automatically switches the mock into th useRepliesQueue mode. -func (a *VppAdapter) MockReply(msg api.Message) { +func (a *VppAdapter) MockReply(replyMsg api.Message, isMultipart bool) { a.repliesLock.Lock() defer a.repliesLock.Unlock() - a.replies = append(a.replies, msg) + a.replies = append(a.replies, reply{msg: replyMsg, multipart: isMultipart, hasSeqNum: false}) + a.mode = useRepliesQueue +} + +// MockReplyWithSeqNum queues next reply like MockReply() does, except that the +// sequence number can be customized and not necessarily match with the request. +func (a *VppAdapter) MockReplyWithSeqNum(replyMsg api.Message, isMultipart bool, sequenceNum uint16) { + a.repliesLock.Lock() + defer a.repliesLock.Unlock() + + a.replies = append(a.replies, reply{msg: replyMsg, multipart: isMultipart, hasSeqNum: true, seqNum: sequenceNum}) a.mode = useRepliesQueue } @@ -319,3 +343,18 @@ func (a *VppAdapter) MockReplyHandler(replyHandler ReplyHandler) { a.replyHandlers = append(a.replyHandlers, replyHandler) a.mode = useReplyHandlers } + +func setSeqNum(context uint32, seqNum uint16) (newContext uint32) { + context &= 0xffff0000 + context |= uint32(seqNum) + return context +} + +func setMultipart(context uint32, isMultipart bool) (newContext uint32) { + context &= 0xfffeffff + if isMultipart { + context |= 1 << 16 + } + return context +} + diff --git a/adapter/vppapiclient/vppapiclient_adapter.go b/adapter/vppapiclient/vppapiclient_adapter.go index 5a05a58..c77d7f1 100644 --- a/adapter/vppapiclient/vppapiclient_adapter.go +++ b/adapter/vppapiclient/vppapiclient_adapter.go @@ -145,8 +145,8 @@ func (a *vppAPIClientAdapter) GetMsgID(msgName string, msgCrc string) (uint16, e } // SendMsg sends a binary-encoded message to VPP. -func (a *vppAPIClientAdapter) SendMsg(clientID uint32, data []byte) error { - rc := C.govpp_send(C.uint32_t(clientID), unsafe.Pointer(&data[0]), C.size_t(len(data))) +func (a *vppAPIClientAdapter) SendMsg(context uint32, data []byte) error { + rc := C.govpp_send(C.uint32_t(context), unsafe.Pointer(&data[0]), C.size_t(len(data))) if rc != 0 { return fmt.Errorf("unable to send the message (error=%d)", rc) } @@ -87,6 +87,8 @@ type MessageIdentifier interface { // methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently, // otherwise the responses could mix! Use multiple channels instead. type Channel struct { + ID uint16 // channel ID + ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider ReplyChan chan *VppReply // channel where VPP replies are delivered to @@ -96,13 +98,15 @@ type Channel struct { MsgDecoder MessageDecoder // used to decode binary data to generated API messages MsgIdentifier MessageIdentifier // used to retrieve message ID of a message + lastSeqNum uint16 // sequence number of the last sent request + + delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout - metadata interface{} // opaque metadata of the API channel - lastTimedOut bool // wether last request timed out } // VppRequest is a request that will be sent to VPP. type VppRequest struct { + SeqNum uint16 // sequence number Message Message // binary API message to be send to VPP Multipart bool // true if multipart response is expected, false otherwise } @@ -110,6 +114,7 @@ type VppRequest struct { // VppReply is a reply received from VPP. type VppReply struct { MessageID uint16 // ID of the message + SeqNum uint16 // sequence number Data []byte // encoded data with the message - MessageDecoder can be used for decoding LastReplyReceived bool // in case of multipart replies, true if the last reply has been already received and this one should be ignored Error error // in case of error, data is nil and this member contains error description @@ -130,30 +135,27 @@ type NotifSubscription struct { // RequestCtx is a context of a ongoing request (simple one - only one response is expected). type RequestCtx struct { ch *Channel + seqNum uint16 } // MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected). type MultiRequestCtx struct { ch *Channel + seqNum uint16 } const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout -// NewChannelInternal returns a new channel structure with metadata field filled in with the provided argument. +// NewChannelInternal returns a new channel structure. // Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly. // Use ChannelProvider to get an API channel ready for communication with VPP. -func NewChannelInternal(metadata interface{}) *Channel { +func NewChannelInternal(id uint16) *Channel { return &Channel{ + ID: id, replyTimeout: defaultReplyTimeout, - metadata: metadata, } } -// Metadata returns the metadata stored within the channel structure by the NewChannelInternal call. -func (ch *Channel) Metadata() interface{} { - return ch.metadata -} - // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply // from VPP before returning an error. func (ch *Channel) SetReplyTimeout(timeout time.Duration) { @@ -170,10 +172,12 @@ func (ch *Channel) Close() { // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). func (ch *Channel) SendRequest(msg Message) *RequestCtx { + ch.lastSeqNum++ ch.ReqChan <- &VppRequest{ Message: msg, + SeqNum: ch.lastSeqNum, } - return &RequestCtx{ch: ch} + return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum} } // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). @@ -183,7 +187,7 @@ func (req *RequestCtx) ReceiveReply(msg Message) error { return errors.New("invalid request context") } - lastReplyReceived, err := req.ch.receiveReplyInternal(msg) + lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) if lastReplyReceived { err = errors.New("multipart reply recieved while a simple reply expected") @@ -195,79 +199,56 @@ func (req *RequestCtx) ReceiveReply(msg Message) error { // Returns a multipart request context, that can be used to call ReceiveReply. // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx { + ch.lastSeqNum++ ch.ReqChan <- &VppRequest{ Message: msg, Multipart: true, + SeqNum: ch.lastSeqNum, } - return &MultiRequestCtx{ch: ch} + return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum} } // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). -// The reply will be decoded into the msg argument. If the last reply has been already consumed, LastReplyReceived is -// set to true. Do not use the message itself if LastReplyReceived is true - it won't be filled with actual data. +// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is +// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data. // Error will be returned if the response cannot be received or decoded. -func (req *MultiRequestCtx) ReceiveReply(msg Message) (LastReplyReceived bool, err error) { +func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) { if req == nil || req.ch == nil { return false, errors.New("invalid request context") } - return req.ch.receiveReplyInternal(msg) + return req.ch.receiveReplyInternal(msg, req.seqNum) } // receiveReplyInternal receives a reply from the reply channel into the provided msg structure. -func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, err error) { +func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) { + var ignore bool if msg == nil { return false, errors.New("nil message passed in") } + if ch.delayedReply != nil { + // try the delayed reply + vppReply := ch.delayedReply + ch.delayedReply = nil + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if !ignore { + return lastReplyReceived, err + } + } + timer := time.NewTimer(ch.replyTimeout) for { select { // blocks until a reply comes to ReplyChan or until timeout expires case vppReply := <-ch.ReplyChan: - if vppReply.Error != nil { - err = vppReply.Error - return - } - if vppReply.LastReplyReceived { - LastReplyReceived = true - return + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if ignore { + continue } - - // message checks - expMsgID, err := ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - return false, err - } - - if vppReply.MessageID != expMsgID { - var msgNameCrc string - if nameCrc, err := ch.MsgIdentifier.LookupByID(vppReply.MessageID); err != nil { - msgNameCrc = err.Error() - } else { - msgNameCrc = nameCrc - } - - if ch.lastTimedOut { - logrus.Warnf("received invalid message ID, expected %d (%s), but got %d (%s) (probably timed out reply from previous request)", - expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc) - continue - } - - err = fmt.Errorf("received invalid message ID, expected %d (%s), but got %d (%s) (check if multiple goroutines are not sharing single GoVPP channel)", - expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc) - return false, err - } - - ch.lastTimedOut = false - // decode the message - err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg) - return false, err + return lastReplyReceived, err case <-timer.C: - ch.lastTimedOut = true err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) return false, err } @@ -275,6 +256,80 @@ func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, er return } +func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) { + // check the sequence number + cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) + if cmpSeqNums == -1 { + // reply received too late, ignore the message + logrus.WithField("sequence-number", reply.SeqNum).Warn( + "Received reply to an already closed binary API request") + ignore = true + return + } + if cmpSeqNums == 1 { + ch.delayedReply = reply + err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum) + return + } + + if reply.Error != nil { + err = reply.Error + return + } + if reply.LastReplyReceived { + lastReplyReceived = true + return + } + + // message checks + var expMsgID uint16 + expMsgID, err = ch.MsgIdentifier.GetMessageID(msg) + if err != nil { + err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + return + } + + if reply.MessageID != expMsgID { + var msgNameCrc string + if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil { + msgNameCrc = err.Error() + } else { + msgNameCrc = nameCrc + } + + err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) " + + "(check if multiple goroutines are not sharing single GoVPP channel)", + reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) + return + } + + // decode the message + err = ch.MsgDecoder.DecodeMsg(reply.Data, msg) + return +} + +// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to, +// or succeeds seq. number <seqNum2>. +// Since sequence numbers cycle in the finite set of size 2^16, the function +// must assume that the distance between compared sequence numbers is less than +// (2^16)/2 to determine the order. +func compareSeqNumbers(seqNum1, seqNum2 uint16) int { + // calculate distance from seqNum1 to seqNum2 + var dist uint16 + if seqNum1 <= seqNum2 { + dist = seqNum2 - seqNum1 + } else { + dist = 0xffff - (seqNum1 - seqNum2 - 1) + } + if dist == 0 { + return 0 + } else if dist <= 0x8000 { + return -1 + } + return 1 +} + // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's // buffer is full, the notifications will not be delivered into it. diff --git a/api/api_test.go b/api/api_test.go index a439986..a83b1cc 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -64,7 +64,7 @@ func TestRequestReplyTapConnect(t *testing.T) { ctx.mockVpp.MockReply(&tap.TapConnectReply{ Retval: 10, SwIfIndex: 1, - }) + }, false) request := &tap.TapConnect{ TapName: []byte("test-tap-name"), UseRandomMac: 1, @@ -84,7 +84,7 @@ func TestRequestReplyTapModify(t *testing.T) { ctx.mockVpp.MockReply(&tap.TapModifyReply{ Retval: 15, SwIfIndex: 2, - }) + }, false) request := &tap.TapModify{ TapName: []byte("test-tap-modify"), UseRandomMac: 1, @@ -104,7 +104,7 @@ func TestRequestReplyTapDelete(t *testing.T) { ctx.mockVpp.MockReply(&tap.TapDeleteReply{ Retval: 20, - }) + }, false) request := &tap.TapDelete{ SwIfIndex: 3, } @@ -123,7 +123,7 @@ func TestRequestReplySwInterfaceTapDump(t *testing.T) { ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{ SwIfIndex: 25, DevName: byteName, - }) + }, false) request := &tap.SwInterfaceTapDump{} reply := &tap.SwInterfaceTapDetails{} @@ -140,7 +140,7 @@ func TestRequestReplyMemifCreate(t *testing.T) { ctx.mockVpp.MockReply(&memif.MemifCreateReply{ Retval: 22, SwIfIndex: 4, - }) + }, false) request := &memif.MemifCreate{ Role: 10, ID: 12, @@ -161,7 +161,7 @@ func TestRequestReplyMemifDelete(t *testing.T) { ctx.mockVpp.MockReply(&memif.MemifDeleteReply{ Retval: 24, - }) + }, false) request := &memif.MemifDelete{ SwIfIndex: 15, } @@ -180,7 +180,7 @@ func TestRequestReplyMemifDetails(t *testing.T) { SwIfIndex: 25, IfName: []byte("memif-name"), Role: 0, - }) + }, false) request := &memif.MemifDump{} reply := &memif.MemifDetails{} @@ -200,9 +200,9 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{ SwIfIndex: uint32(i), DevName: []byte("dev-name-test"), - }) + }, true) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) reqCtx := ctx.ch.SendMultiRequest(&tap.SwInterfaceTapDump{}) cnt := 0 @@ -226,9 +226,9 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { for i := 1; i <= 10; i++ { ctx.mockVpp.MockReply(&memif.MemifDetails{ SwIfIndex: uint32(i), - }) + }, true) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) reqCtx := ctx.ch.SendMultiRequest(&memif.MemifDump{}) cnt := 0 @@ -257,7 +257,7 @@ func TestNotifications(t *testing.T) { ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ SwIfIndex: 3, AdminUpDown: 1, - }) + }, false) ctx.mockVpp.SendMsg(0, []byte("")) // receive the notification @@ -292,7 +292,7 @@ func TestNotificationEvent(t *testing.T) { ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{ SwIfIndex: 2, LinkUpDown: 1, - }) + }, false) ctx.mockVpp.SendMsg(0, []byte("")) // receive the notification @@ -328,7 +328,7 @@ func TestSetReplyTimeout(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond) // first one request should work - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false) err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -348,9 +348,9 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) { ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), InterfaceName: []byte("if-name-test"), - }) + }, true) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) cnt := 0 sendMultiRequest := func() error { @@ -410,19 +410,19 @@ func TestMultiRequestDouble(t *testing.T) { // mock reply for i := 1; i <= 3; i++ { - ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ + ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), InterfaceName: []byte("if-name-test"), - }) + }, true, 1) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 1) for i := 1; i <= 3; i++ { - ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ + ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), InterfaceName: []byte("if-name-test"), - }) + }, true, 2) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true,2) cnt := 0 var sendMultiRequest = func() error { @@ -457,7 +457,7 @@ func TestReceiveReplyAfterTimeout(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond) // first one request should work - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,1) err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -466,10 +466,10 @@ func TestReceiveReplyAfterTimeout(t *testing.T) { Expect(err.Error()).To(ContainSubstring("timeout")) // simulating late reply - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false,2) // normal reply for next request - ctx.mockVpp.MockReply(&tap.TapConnectReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false,3) req := &tap.TapConnect{ TapName: []byte("test-tap-name"), @@ -497,7 +497,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond * 100) // first one request should work - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, false, 1) err := ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) Expect(err).ShouldNot(HaveOccurred()) @@ -525,15 +525,15 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { // simulating late replies for i := 1; i <= 3; i++ { - ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{ + ctx.mockVpp.MockReplyWithSeqNum(&interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), InterfaceName: []byte("if-name-test"), - }) + }, true, 2) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{}, true, 2) // normal reply for next request - ctx.mockVpp.MockReply(&tap.TapConnectReply{}) + ctx.mockVpp.MockReplyWithSeqNum(&tap.TapConnectReply{}, false, 3) req := &tap.TapConnect{ TapName: []byte("test-tap-name"), @@ -551,12 +551,12 @@ func TestInvalidMessageID(t *testing.T) { defer ctx.teardownTest() // first one request should work - ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false) err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{}) Expect(err).ShouldNot(HaveOccurred()) // second should fail with error invalid message ID - ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}) + ctx.mockVpp.MockReply(&vpe.ShowVersionReply{}, false) err = ctx.ch.SendRequest(&vpe.ControlPing{}).ReceiveReply(&vpe.ControlPingReply{}) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("invalid message ID")) diff --git a/core/core.go b/core/core.go index 4782ba1..052eb0b 100644 --- a/core/core.go +++ b/core/core.go @@ -77,12 +77,12 @@ type Connection struct { msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC channelsLock sync.RWMutex // lock for the channels map - channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID + channels map[uint16]*api.Channel // map of all API channels indexed by the channel ID notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID - maxChannelID uint32 // maximum used client ID + maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) pingReqID uint16 // ID if the ControlPing message pingReplyID uint16 // ID of the ControlPingReply message @@ -90,12 +90,6 @@ type Connection struct { lastReply time.Time // time of the last received reply from VPP } -// channelMetadata contains core-local metadata of an API channel. -type channelMetadata struct { - id uint32 // channel ID - multipart uint32 // 1 if multipart request is being processed, 0 otherwise -} - var ( log *logger.Logger // global logger conn *Connection // global handle to the Connection (used in the message receive callback) @@ -204,7 +198,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { conn = &Connection{ vpp: vppAdapter, codec: &MsgCodec{}, - channels: make(map[uint32]*api.Channel), + channels: make(map[uint16]*api.Channel), msgIDs: make(map[string]uint16), notifSubscriptions: make(map[uint16][]*api.NotifSubscription), } @@ -370,10 +364,9 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) if c == nil { return nil, errors.New("nil connection passed in") } - chID := atomic.AddUint32(&c.maxChannelID, 1) - chMeta := &channelMetadata{id: chID} - ch := api.NewChannelInternal(chMeta) + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + ch := api.NewChannelInternal(chID) ch.MsgDecoder = c.codec ch.MsgIdentifier = c @@ -389,19 +382,19 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) c.channelsLock.Unlock() // start watching on the request channel - go c.watchRequests(ch, chMeta) + go c.watchRequests(ch) return ch, nil } // releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) { +func (c *Connection) releaseAPIChannel(ch *api.Channel) { log.WithFields(logger.Fields{ - "context": chMeta.id, + "ID": ch.ID, }).Debug("API channel closed.") // delete the channel from channels map c.channelsLock.Lock() - delete(c.channels, chMeta.id) + delete(c.channels, ch.ID) c.channelsLock.Unlock() } diff --git a/core/core_test.go b/core/core_test.go index e80f97c..981ff19 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -33,7 +33,7 @@ type testCtx struct { ch *api.Channel } -func setupTest(t *testing.T) *testCtx { +func setupTest(t *testing.T, bufferedChan bool) *testCtx { RegisterTestingT(t) ctx := &testCtx{} @@ -43,7 +43,11 @@ func setupTest(t *testing.T) *testCtx { ctx.conn, err = core.Connect(ctx.mockVpp) Expect(err).ShouldNot(HaveOccurred()) - ctx.ch, err = ctx.conn.NewAPIChannel() + if bufferedChan { + ctx.ch, err = ctx.conn.NewAPIChannelBuffered(100, 100) + } else { + ctx.ch, err = ctx.conn.NewAPIChannel() + } Expect(err).ShouldNot(HaveOccurred()) return ctx @@ -55,10 +59,10 @@ func (ctx *testCtx) teardownTest() { } func TestSimpleRequest(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() - ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5}, false) req := &vpe.ControlPing{} reply := &vpe.ControlPingReply{} @@ -78,13 +82,13 @@ func TestSimpleRequest(t *testing.T) { } func TestMultiRequest(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() for m := 0; m < 10; m++ { - ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{}) + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{}, true) } - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) // send multipart request ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} @@ -109,7 +113,7 @@ func TestMultiRequest(t *testing.T) { } func TestNotifications(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() // subscribe for notification @@ -129,7 +133,7 @@ func TestNotifications(t *testing.T) { ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{ SwIfIndex: 3, AdminUpDown: 1, - }) + }, false) ctx.mockVpp.SendMsg(0, []byte{0}) // receive the notification @@ -162,7 +166,7 @@ func TestNilConnection(t *testing.T) { } func TestDoubleConnection(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() conn, err := core.Connect(ctx.mockVpp) @@ -172,7 +176,7 @@ func TestDoubleConnection(t *testing.T) { } func TestAsyncConnection(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() ctx.conn.Disconnect() @@ -187,7 +191,7 @@ func TestAsyncConnection(t *testing.T) { } func TestFullBuffer(t *testing.T) { - ctx := setupTest(t) + ctx := setupTest(t, false) defer ctx.teardownTest() // close the default API channel @@ -200,7 +204,7 @@ func TestFullBuffer(t *testing.T) { // send multiple requests, only one reply should be read for i := 0; i < 20; i++ { - ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, false) ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} } @@ -274,3 +278,225 @@ func TestCodecNegative(t *testing.T) { Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("EOF")) } + +func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + var reqCtx []*api.RequestCtx + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}, false) + req := &vpe.ControlPing{} + reqCtx = append(reqCtx, ctx.ch.SendRequest(req)) + } + + for i := 0; i < 10; i++ { + reply := &vpe.ControlPingReply{} + err := reqCtx[i].ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(i)) + } +} + +func TestMultiRequestsWithSequenceNumbers(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true) + } + ctx.mockVpp.MockReply(&vpe.ControlPingReply{}, true) + + // send multipart request + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + + cnt := 0 + for { + Expect(cnt < 11).To(BeTrue()) + + // receive a reply + reply := &interfaces.SwInterfaceDetails{} + lastReplyReceived, err := reqCtx.ReceiveReply(reply) + + if lastReplyReceived { + break // break out of the loop + } + + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(cnt)) + + cnt++ + } + + Expect(cnt).To(BeEquivalentTo(10)) +} + +func TestSimpleRequestWithTimeout(t *testing.T) { + ctx := setupTest(t, true) + defer ctx.teardownTest() + + // reply for a previous timeouted requests to be ignored + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0) + + // send reply later + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + reply := &vpe.ControlPingReply{} + err := reqCtx1.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) + + // reply for the previous request + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,1) + + // next request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false) + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // second request should ignore the first reply and return the second one + reply = &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply) + Expect(err).To(BeNil()) + Expect(reply.Retval).To(BeEquivalentTo(2)) +} + +func TestSimpleRequestsWithMissingReply(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // request without reply + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + // another request without reply + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // third request with reply + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 3}, false, 3) + req3 := &vpe.ControlPing{} + reqCtx3 := ctx.ch.SendRequest(req3) + + // the first two should fail, but not consume reply for the 3rd + reply := &vpe.ControlPingReply{} + err := reqCtx1.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + reply = &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 2")) + + // the second request should succeed + reply = &vpe.ControlPingReply{} + err = reqCtx3.ReceiveReply(reply) + Expect(err).To(BeNil()) + Expect(reply.Retval).To(BeEquivalentTo(3)) +} + +func TestMultiRequestsWithErrors(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // reply for a previous timeouted requests to be ignored + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff - 1) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0xffff) + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 1}, false,0) + + for i := 0; i < 10; i++ { + ctx.mockVpp.MockReply(&interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)}, true) + } + // missing finalizing control ping + + // reply for a next request + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: 2}, false,2) + + // send multipart request + reqCtx := ctx.ch.SendMultiRequest(&interfaces.SwInterfaceDump{}) + + for i := 0; i < 10; i++ { + // receive multi-part replies + reply := &interfaces.SwInterfaceDetails{} + lastReplyReceived, err := reqCtx.ReceiveReply(reply) + + Expect(lastReplyReceived).To(BeFalse()) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.SwIfIndex).To(BeEquivalentTo(i)) + } + + // missing closing control ping + reply := &interfaces.SwInterfaceDetails{} + _, err := reqCtx.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + // try again - still fails and nothing consumed + _, err = reqCtx.ReceiveReply(reply) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("missing binary API reply with sequence number: 1")) + + // reply for the second request has not been consumed + reqCtx2 := ctx.ch.SendRequest(&vpe.ControlPing{}) + reply2 := &vpe.ControlPingReply{} + err = reqCtx2.ReceiveReply(reply2) + Expect(err).To(BeNil()) + Expect(reply2.Retval).To(BeEquivalentTo(2)) +} + +func TestRequestsOrdering(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + // the orderings of SendRequest and ReceiveReply calls should match, otherwise + // some replies will get thrown away + + // first request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1}, false) + req1 := &vpe.ControlPing{} + reqCtx1 := ctx.ch.SendRequest(req1) + + // second request + ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2}, false) + req2 := &vpe.ControlPing{} + reqCtx2 := ctx.ch.SendRequest(req2) + + // if reply for the second request is read first, the reply for the first + // request gets thrown away. + reply2 := &vpe.ControlPingReply{} + err := reqCtx2.ReceiveReply(reply2) + Expect(err).To(BeNil()) + Expect(reply2.Retval).To(BeEquivalentTo(2)) + + // first request has already been considered closed + reply1 := &vpe.ControlPingReply{} + err = reqCtx1.ReceiveReply(reply1) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) +} + +func TestCycleOverSetOfSequenceNumbers(t *testing.T) { + ctx := setupTest(t, true) + defer ctx.teardownTest() + + numIters := 0xffff + 100 + reqCtx := make(map[int]*api.RequestCtx) + + var seqNum uint16 = 0 + for i := 0; i < numIters + 30 /* receiver is 30 reqs behind */; i++ { + seqNum++ + if i < numIters { + ctx.mockVpp.MockReplyWithSeqNum(&vpe.ControlPingReply{Retval: int32(i)}, false, seqNum) + req := &vpe.ControlPing{} + reqCtx[i] = ctx.ch.SendRequest(req) + } + if i > 30 { + reply := &vpe.ControlPingReply{} + err := reqCtx[i-30].ReceiveReply(reply) + Expect(err).ShouldNot(HaveOccurred()) + Expect(reply.Retval).To(BeEquivalentTo(i-30)) + } + } +}
\ No newline at end of file diff --git a/core/request_handler.go b/core/request_handler.go index 8f793f5..3bec38d 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -31,17 +31,17 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { +func (c *Connection) watchRequests(ch *api.Channel) { for { select { case req, ok := <-ch.ReqChan: // new request on the request channel if !ok { // after closing the request channel, release API channel and return - c.releaseAPIChannel(ch, chMeta) + c.releaseAPIChannel(ch) return } - c.processRequest(ch, chMeta, req) + c.processRequest(ch, req) case req := <-ch.NotifSubsChan: // new request on the notification subscribe channel @@ -51,12 +51,12 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { +func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected log.Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -67,8 +67,9 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re log.WithFields(logger.Fields{ "msg_name": req.Message.GetMessageName(), "msg_crc": req.Message.GetCrcString(), + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -77,37 +78,35 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re if err != nil { err = fmt.Errorf("unable to encode the messge: %v", err) log.WithFields(logger.Fields{ - "context": chMeta.id, + "channel": ch.ID, "msg_id": msgID, + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "context": chMeta.id, + "channel": ch.ID, "msg_id": msgID, "msg_size": len(data), "msg_name": req.Message.GetMessageName(), + "seq_num": req.SeqNum, }).Debug("Sending a message to VPP.") } - // send the message - if req.Multipart { - // expect multipart response - atomic.StoreUint32(&chMeta.multipart, 1) - } - // send the request to VPP - err = c.vpp.SendMsg(chMeta.id, data) + context := packRequestContext(ch.ID, req.Multipart, req.SeqNum) + err = c.vpp.SendMsg(context, data) if err != nil { - err = fmt.Errorf("unable to send the messge: %v", err) + err = fmt.Errorf("unable to send the message: %v", err) log.WithFields(logger.Fields{ - "context": chMeta.id, + "context": context, "msg_id": msgID, + "seq_num": req.SeqNum, }).Error(err) - sendReply(ch, &api.VppReply{Error: err}) + sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err}) return err } @@ -116,12 +115,13 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID) log.WithFields(logger.Fields{ - "context": chMeta.id, + "context": context, "msg_id": c.pingReqID, "msg_size": len(pingData), + "seq_num": req.SeqNum, }).Debug("Sending a control ping to VPP.") - c.vpp.SendMsg(chMeta.id, pingData) + c.vpp.SendMsg(context, pingData) } return nil @@ -137,11 +137,14 @@ func msgCallback(context uint32, msgID uint16, data []byte) { return } + chanID, isMultipart, seqNum := unpackRequestContext(context) if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "msg_size": len(data), + "msg_id": msgID, + "msg_size": len(data), + "channel_id": chanID, + "is_multipart": isMultipart, + "seq_num": seqNum, }).Debug("Received a message from VPP.") } @@ -153,27 +156,27 @@ func msgCallback(context uint32, msgID uint16, data []byte) { // match ch according to the context conn.channelsLock.RLock() - ch, ok := conn.channels[context] + ch, ok := conn.channels[chanID] conn.channelsLock.RUnlock() if !ok { log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - }).Error("Context ID not known, ignoring the message.") + "channel_id": chanID, + "msg_id": msgID, + }).Error("Channel ID not known, ignoring the message.") return } - chMeta := ch.Metadata().(*channelMetadata) lastReplyReceived := false - // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply - if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { + // if this is a control ping reply to a multipart request, treat this as a last part of the reply + if msgID == conn.pingReplyID && isMultipart { lastReplyReceived = true } // send the data to the channel sendReply(ch, &api.VppReply{ MessageID: msgID, + SeqNum: seqNum, Data: data, LastReplyReceived: lastReplyReceived, }) @@ -195,6 +198,7 @@ func sendReply(ch *api.Channel, reply *api.VppReply) { log.WithFields(logger.Fields{ "channel": ch, "msg_id": reply.MessageID, + "seq_num": reply.SeqNum, }).Warn("Unable to send the reply, reciever end not ready.") } } @@ -254,3 +258,24 @@ func (c *Connection) LookupByID(ID uint16) (string, error) { return "", fmt.Errorf("unknown message ID: %d", ID) } + +// +------------------+-------------------+-----------------------+ +// | 15b = channel ID | 1b = is multipart | 16b = sequence number | +// +------------------+-------------------+-----------------------+ +func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 { + context := uint32(chanID) << 17 + if isMultipart { + context |= 1 << 16 + } + context |= uint32(seqNum) + return context +} + +func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) { + chanID = uint16(context >> 17) + if ((context >> 16) & 0x1) != 0 { + isMulipart = true + } + seqNum = uint16(context & 0xffff) + return +} |