summaryrefslogtreecommitdiffstats
path: root/api/api.go
diff options
context:
space:
mode:
authorMilan Lenco <milan.lenco@pantheon.tech>2018-06-25 20:31:11 +0200
committerMilan Lenco <milan.lenco@pantheon.tech>2018-06-26 15:07:55 +0200
commit8adb6cdcb496f05169263d32a857791faf8baee1 (patch)
tree12bb4fbce0af84326c1a6d80b76a71ad097fdf7d /api/api.go
parente44d8c3905e22f940a100e6331a45412cba9d47e (diff)
Pair requests with replies using sequence numbers
Requests are given sequence numbers (cycling over a finite set of 2^16 integers) that are stored into the lower 16bits of the context. 1bit is also allocated for isMultipart boolean flag and the remaining 15bits are used to store the channel ID. The sequence numbers allow to reliably pair replies with requests, even in scenarious with timeouted requests or ignored (unread) replies. Sequencing is not used with asynchronous messaging as it is implemented by methods of the Channel structure, i.e. above ReqChan and ReplyChan channels. Change-Id: I7ca0e8489c7ffcc388c3cfef6d05c02f9500931c Signed-off-by: Milan Lenco <milan.lenco@pantheon.tech>
Diffstat (limited to 'api/api.go')
-rw-r--r--api/api.go171
1 files changed, 113 insertions, 58 deletions
diff --git a/api/api.go b/api/api.go
index eafdf22..3c2c7ec 100644
--- a/api/api.go
+++ b/api/api.go
@@ -87,6 +87,8 @@ type MessageIdentifier interface {
// methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently,
// otherwise the responses could mix! Use multiple channels instead.
type Channel struct {
+ ID uint16 // channel ID
+
ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
ReplyChan chan *VppReply // channel where VPP replies are delivered to
@@ -96,13 +98,15 @@ type Channel struct {
MsgDecoder MessageDecoder // used to decode binary data to generated API messages
MsgIdentifier MessageIdentifier // used to retrieve message ID of a message
+ lastSeqNum uint16 // sequence number of the last sent request
+
+ delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery
replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
- metadata interface{} // opaque metadata of the API channel
- lastTimedOut bool // wether last request timed out
}
// VppRequest is a request that will be sent to VPP.
type VppRequest struct {
+ SeqNum uint16 // sequence number
Message Message // binary API message to be send to VPP
Multipart bool // true if multipart response is expected, false otherwise
}
@@ -110,6 +114,7 @@ type VppRequest struct {
// VppReply is a reply received from VPP.
type VppReply struct {
MessageID uint16 // ID of the message
+ SeqNum uint16 // sequence number
Data []byte // encoded data with the message - MessageDecoder can be used for decoding
LastReplyReceived bool // in case of multipart replies, true if the last reply has been already received and this one should be ignored
Error error // in case of error, data is nil and this member contains error description
@@ -130,30 +135,27 @@ type NotifSubscription struct {
// RequestCtx is a context of a ongoing request (simple one - only one response is expected).
type RequestCtx struct {
ch *Channel
+ seqNum uint16
}
// MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected).
type MultiRequestCtx struct {
ch *Channel
+ seqNum uint16
}
const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
-// NewChannelInternal returns a new channel structure with metadata field filled in with the provided argument.
+// NewChannelInternal returns a new channel structure.
// Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly.
// Use ChannelProvider to get an API channel ready for communication with VPP.
-func NewChannelInternal(metadata interface{}) *Channel {
+func NewChannelInternal(id uint16) *Channel {
return &Channel{
+ ID: id,
replyTimeout: defaultReplyTimeout,
- metadata: metadata,
}
}
-// Metadata returns the metadata stored within the channel structure by the NewChannelInternal call.
-func (ch *Channel) Metadata() interface{} {
- return ch.metadata
-}
-
// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
// from VPP before returning an error.
func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
@@ -170,10 +172,12 @@ func (ch *Channel) Close() {
// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
func (ch *Channel) SendRequest(msg Message) *RequestCtx {
+ ch.lastSeqNum++
ch.ReqChan <- &VppRequest{
Message: msg,
+ SeqNum: ch.lastSeqNum,
}
- return &RequestCtx{ch: ch}
+ return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum}
}
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
@@ -183,7 +187,7 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
return errors.New("invalid request context")
}
- lastReplyReceived, err := req.ch.receiveReplyInternal(msg)
+ lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
if lastReplyReceived {
err = errors.New("multipart reply recieved while a simple reply expected")
@@ -195,79 +199,56 @@ func (req *RequestCtx) ReceiveReply(msg Message) error {
// Returns a multipart request context, that can be used to call ReceiveReply.
// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx {
+ ch.lastSeqNum++
ch.ReqChan <- &VppRequest{
Message: msg,
Multipart: true,
+ SeqNum: ch.lastSeqNum,
}
- return &MultiRequestCtx{ch: ch}
+ return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum}
}
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
-// The reply will be decoded into the msg argument. If the last reply has been already consumed, LastReplyReceived is
-// set to true. Do not use the message itself if LastReplyReceived is true - it won't be filled with actual data.
+// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
+// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data.
// Error will be returned if the response cannot be received or decoded.
-func (req *MultiRequestCtx) ReceiveReply(msg Message) (LastReplyReceived bool, err error) {
+func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) {
if req == nil || req.ch == nil {
return false, errors.New("invalid request context")
}
- return req.ch.receiveReplyInternal(msg)
+ return req.ch.receiveReplyInternal(msg, req.seqNum)
}
// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
-func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, err error) {
+func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
+ var ignore bool
if msg == nil {
return false, errors.New("nil message passed in")
}
+ if ch.delayedReply != nil {
+ // try the delayed reply
+ vppReply := ch.delayedReply
+ ch.delayedReply = nil
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if !ignore {
+ return lastReplyReceived, err
+ }
+ }
+
timer := time.NewTimer(ch.replyTimeout)
for {
select {
// blocks until a reply comes to ReplyChan or until timeout expires
case vppReply := <-ch.ReplyChan:
- if vppReply.Error != nil {
- err = vppReply.Error
- return
- }
- if vppReply.LastReplyReceived {
- LastReplyReceived = true
- return
+ ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
+ if ignore {
+ continue
}
-
- // message checks
- expMsgID, err := ch.MsgIdentifier.GetMessageID(msg)
- if err != nil {
- err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
- msg.GetMessageName(), msg.GetCrcString())
- return false, err
- }
-
- if vppReply.MessageID != expMsgID {
- var msgNameCrc string
- if nameCrc, err := ch.MsgIdentifier.LookupByID(vppReply.MessageID); err != nil {
- msgNameCrc = err.Error()
- } else {
- msgNameCrc = nameCrc
- }
-
- if ch.lastTimedOut {
- logrus.Warnf("received invalid message ID, expected %d (%s), but got %d (%s) (probably timed out reply from previous request)",
- expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc)
- continue
- }
-
- err = fmt.Errorf("received invalid message ID, expected %d (%s), but got %d (%s) (check if multiple goroutines are not sharing single GoVPP channel)",
- expMsgID, msg.GetMessageName(), vppReply.MessageID, msgNameCrc)
- return false, err
- }
-
- ch.lastTimedOut = false
- // decode the message
- err = ch.MsgDecoder.DecodeMsg(vppReply.Data, msg)
- return false, err
+ return lastReplyReceived, err
case <-timer.C:
- ch.lastTimedOut = true
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
return false, err
}
@@ -275,6 +256,80 @@ func (ch *Channel) receiveReplyInternal(msg Message) (LastReplyReceived bool, er
return
}
+func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) {
+ // check the sequence number
+ cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
+ if cmpSeqNums == -1 {
+ // reply received too late, ignore the message
+ logrus.WithField("sequence-number", reply.SeqNum).Warn(
+ "Received reply to an already closed binary API request")
+ ignore = true
+ return
+ }
+ if cmpSeqNums == 1 {
+ ch.delayedReply = reply
+ err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
+ return
+ }
+
+ if reply.Error != nil {
+ err = reply.Error
+ return
+ }
+ if reply.LastReplyReceived {
+ lastReplyReceived = true
+ return
+ }
+
+ // message checks
+ var expMsgID uint16
+ expMsgID, err = ch.MsgIdentifier.GetMessageID(msg)
+ if err != nil {
+ err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
+ msg.GetMessageName(), msg.GetCrcString())
+ return
+ }
+
+ if reply.MessageID != expMsgID {
+ var msgNameCrc string
+ if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil {
+ msgNameCrc = err.Error()
+ } else {
+ msgNameCrc = nameCrc
+ }
+
+ err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) " +
+ "(check if multiple goroutines are not sharing single GoVPP channel)",
+ reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
+ return
+ }
+
+ // decode the message
+ err = ch.MsgDecoder.DecodeMsg(reply.Data, msg)
+ return
+}
+
+// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
+// or succeeds seq. number <seqNum2>.
+// Since sequence numbers cycle in the finite set of size 2^16, the function
+// must assume that the distance between compared sequence numbers is less than
+// (2^16)/2 to determine the order.
+func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
+ // calculate distance from seqNum1 to seqNum2
+ var dist uint16
+ if seqNum1 <= seqNum2 {
+ dist = seqNum2 - seqNum1
+ } else {
+ dist = 0xffff - (seqNum1 - seqNum2 - 1)
+ }
+ if dist == 0 {
+ return 0
+ } else if dist <= 0x8000 {
+ return -1
+ }
+ return 1
+}
+
// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
// buffer is full, the notifications will not be delivered into it.