diff options
Diffstat (limited to 'api/api.go')
-rw-r--r-- | api/api.go | 171 |
1 files changed, 113 insertions, 58 deletions
@@ -87,6 +87,8 @@ type MessageIdentifier interface { // methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently, // otherwise the responses could mix! Use multiple channels instead. type Channel struct { + ID uint16 // channel ID + ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider ReplyChan chan *VppReply // channel where VPP replies are delivered to @@ -96,13 +98,15 @@ type Channel struct { MsgDecoder MessageDecoder // used to decode binary data to generated API messages MsgIdentifier MessageIdentifier // used to retrieve message ID of a message + lastSeqNum uint16 // sequence number of the last sent request + + delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout - metadata interface{} // opaque metadata of the API channel - lastTimedOut bool // wether last request timed out } // VppRequest is a request that will be sent to VPP. type VppRequest struct { + SeqNum uint16 // sequence number Message Message // binary API message to be send to VPP Multipart bool // true if multipart response is expected, false otherwise } @@ -110,6 +114,7 @@ type VppRequest struct { // VppReply is a reply received from VPP. type VppReply struct { MessageID uint16 // ID of the message + SeqNum uint16 // sequence number Data []byte // encoded data with the message - MessageDecoder can be used for decoding LastReplyReceived bool // in case of multipart replies, true if the last reply has been already received and this one should be ignored Error error // in case of error, data is nil and this member contains error description @@ -130,30 +135,27 @@ type NotifSubscription struct { // RequestCtx is a context of a ongoing request (simple one - only one response is expected). type RequestCtx struct { ch *Channel + seqNum uint16 } // MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected). type MultiRequestCtx struct { ch *Channel + seqNum uint16 } const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout -// NewChannelInternal returns a new channel structure with metadata field filled in with the provided argument. +// NewChannelInternal returns a new channel structure. // Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly. // Use ChannelProvider to get an API channel ready for communication with VPP. -func NewChannelInternal(metadata interface{}) *Channel { +func NewChannelInternal(id uint16) *Channel { return &Channel{ + ID: id, replyTimeout: defaultReplyTimeout, - metadata: metadata, } } -// Metadata returns the metadata stored within the channel structure by the NewChannelInternal call. -func (ch *Channel) Metadata() interface{} { - return ch.metadata -} - // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply // from VPP before returning an error. func (ch *Channel) SetReplyTimeout(timeout time.Duration) { @@ -170,10 +172,12 @@ func (ch *Channel) Close() { // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). func (ch *Channel) SendRequest(msg Message) *RequestCtx { + ch.lastSeqNum++ ch.ReqChan <- &VppRequest{ Message: msg, + SeqNum: ch.lastSeqNum, } - return &RequestCtx{ch: ch} + return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum} } // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). @@ -183,7 +187,7 @@ func (req *RequestCtx) ReceiveReply(msg Message) error { return errors.New("invalid request context") } - lastReplyReceived, err := req.ch.receiveReplyInternal(msg) + lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) if lastReplyReceived { err = errors.New("multipart reply recieved while a simple reply expected") @@ -195,79 +199,56 @@ func (req *RequestCtx) ReceiveReply(msg Message) error { // Returns a multipart request context, that can be used to call ReceiveReply. // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx { + ch.lastSeqNum++ ch.ReqChan <- &VppRequest{ Message: msg, Multipart: true, + SeqNum: ch.lastSeqNum, } - return &MultiRequestCtx{ch: ch} + return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum} } // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). -// The reply will be decoded into the msg argument. If the last reply has been already consumed, LastReplyReceived is -// set to true. Do not use the message itself if LastReplyReceived is true - it won't be filled with actual data. +// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is +// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data. // Error will be returned if the response cannot be received or decoded. -func (req *MultiRequestCtx) ReceiveReply(msg Message) (LastReplyReceived bool, err error) { +func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) { if req == nil || req.ch == nil { return false, errors.New("invalid request context") } - return req.ch.receiveReplyInternal(msg) + return req.ch.receiveReplyInternal(msg, req.seqNum) } // receiveReplyInternal receives a reply from the reply channel into the provided msg structure. -func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, err error) { +func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) { + var ignore bool if msg == nil { return false, errors.New("nil message passed in") } + if ch.delayedReply != nil { + // try the delayed reply + vppReply := ch.delayedReply + ch.delayedReply = nil + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if !ignore { + return lastReplyReceived, err + } + } + timer := time.NewTimer(ch.replyTimeout) for { select { // blocks until a reply comes to ReplyChan or until timeout expires case vppReply := <-ch.ReplyChan: - if vppReply.Error != nil { - err = vppReply.Error - return - } - if vppReply.LastReplyReceived { - LastReplyReceived = true - return + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if ignore { + continue } - - // message checks - expMsgID, err := ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - return false, err - } - - if vppReply.MessageID != expMsgID { - var msgNameCrc string - if nameCrc, err := ch.MsgIdentifier.LookupByID(vppReply.MessageID); err != nil { - msgNameCrc = err.Error() - } else { - msgNameCrc = nameCrc - } - - if ch.lastTimedOut { - logrus.Warnf("received invalid message ID, expected %d (%s), but got %d (%s) (probably timed out reply from previous request)", - expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc) - continue - } - - err = fmt.Errorf("received invalid message ID, expected %d (%s), but got %d (%s) (check if multiple goroutines are not sharing single GoVPP channel)", - expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc) - return false, err - } - - ch.lastTimedOut = false - // decode the message - err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg) - return false, err + return lastReplyReceived, err case <-timer.C: - ch.lastTimedOut = true err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) return false, err } @@ -275,6 +256,80 @@ func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, er return } +func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) { + // check the sequence number + cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) + if cmpSeqNums == -1 { + // reply received too late, ignore the message + logrus.WithField("sequence-number", reply.SeqNum).Warn( + "Received reply to an already closed binary API request") + ignore = true + return + } + if cmpSeqNums == 1 { + ch.delayedReply = reply + err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum) + return + } + + if reply.Error != nil { + err = reply.Error + return + } + if reply.LastReplyReceived { + lastReplyReceived = true + return + } + + // message checks + var expMsgID uint16 + expMsgID, err = ch.MsgIdentifier.GetMessageID(msg) + if err != nil { + err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + return + } + + if reply.MessageID != expMsgID { + var msgNameCrc string + if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil { + msgNameCrc = err.Error() + } else { + msgNameCrc = nameCrc + } + + err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) " + + "(check if multiple goroutines are not sharing single GoVPP channel)", + reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) + return + } + + // decode the message + err = ch.MsgDecoder.DecodeMsg(reply.Data, msg) + return +} + +// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to, +// or succeeds seq. number <seqNum2>. +// Since sequence numbers cycle in the finite set of size 2^16, the function +// must assume that the distance between compared sequence numbers is less than +// (2^16)/2 to determine the order. +func compareSeqNumbers(seqNum1, seqNum2 uint16) int { + // calculate distance from seqNum1 to seqNum2 + var dist uint16 + if seqNum1 <= seqNum2 { + dist = seqNum2 - seqNum1 + } else { + dist = 0xffff - (seqNum1 - seqNum2 - 1) + } + if dist == 0 { + return 0 + } else if dist <= 0x8000 { + return -1 + } + return 1 +} + // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's // buffer is full, the notifications will not be delivered into it. |