diff options
Diffstat (limited to 'core/channel.go')
-rw-r--r-- | core/channel.go | 209 |
1 files changed, 102 insertions, 107 deletions
diff --git a/core/channel.go b/core/channel.go index 87b3e29..5f7763e 100644 --- a/core/channel.go +++ b/core/channel.go @@ -15,50 +15,78 @@ package core import ( + "errors" "fmt" + "reflect" + "strings" "time" - "errors" - "git.fd.io/govpp.git/api" "github.com/sirupsen/logrus" ) -const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout +var ( + ErrInvalidRequestCtx = errors.New("invalid request context") +) -// requestCtxData is a context of a ongoing request (simple one - only one response is expected). -type requestCtxData struct { +// requestCtx is a context for request with single reply +type requestCtx struct { ch *channel seqNum uint16 } -// multiRequestCtxData is a context of a ongoing multipart request (multiple responses are expected). -type multiRequestCtxData struct { +// multiRequestCtx is a context for request with multiple responses +type multiRequestCtx struct { ch *channel seqNum uint16 } -func (req *requestCtxData) ReceiveReply(msg api.Message) error { +func (req *requestCtx) ReceiveReply(msg api.Message) error { if req == nil || req.ch == nil { - return errors.New("invalid request context") + return ErrInvalidRequestCtx } lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) - + if err != nil { + return err + } if lastReplyReceived { - err = errors.New("multipart reply recieved while a simple reply expected") + return errors.New("multipart reply recieved while a single reply expected") } - return err + + return nil } -func (req *multiRequestCtxData) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { +func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { if req == nil || req.ch == nil { - return false, errors.New("invalid request context") + return false, ErrInvalidRequestCtx } return req.ch.receiveReplyInternal(msg, req.seqNum) } +// vppRequest is a request that will be sent to VPP. +type vppRequest struct { + seqNum uint16 // sequence number + msg api.Message // binary API message to be send to VPP + multi bool // true if multipart response is expected +} + +// vppReply is a reply received from VPP. +type vppReply struct { + seqNum uint16 // sequence number + msgID uint16 // ID of the message + data []byte // encoded data with the message + lastReceived bool // for multi request, true if the last reply has been already received + err error // in case of error, data is nil and this member contains error +} + +// NotifSubscribeRequest is a request to subscribe for delivery of specific notification messages. +type subscriptionRequest struct { + sub *api.NotifSubscription // subscription details + subscribe bool // true if this is a request to subscribe +} + // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines @@ -66,99 +94,75 @@ func (req *multiRequestCtxData) ReceiveReply(msg api.Message) (lastReplyReceived type channel struct { id uint16 // channel ID - reqChan chan *api.VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider - replyChan chan *api.VppReply // channel where VPP replies are delivered to + reqChan chan *vppRequest // channel for sending the requests to VPP + replyChan chan *vppReply // channel where VPP replies are delivered to - notifSubsChan chan *api.NotifSubscribeRequest // channel for sending notification subscribe requests - notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to + notifSubsChan chan *subscriptionRequest // channel for sending notification subscribe requests + notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to msgDecoder api.MessageDecoder // used to decode binary data to generated API messages msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message lastSeqNum uint16 // sequence number of the last sent request - delayedReply *api.VppReply // reply already taken from ReplyChan, buffered for later delivery + delayedReply *vppReply // reply already taken from ReplyChan, buffered for later delivery replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout } -func (ch *channel) SendRequest(msg api.Message) api.RequestCtx { +func (ch *channel) GetID() uint16 { + return ch.id +} + +func (ch *channel) nextSeqNum() uint16 { ch.lastSeqNum++ - ch.reqChan <- &api.VppRequest{ - Message: msg, - SeqNum: ch.lastSeqNum, + return ch.lastSeqNum +} + +func (ch *channel) SendRequest(msg api.Message) api.RequestCtx { + req := &vppRequest{ + msg: msg, + seqNum: ch.nextSeqNum(), } - return &requestCtxData{ch: ch, seqNum: ch.lastSeqNum} + ch.reqChan <- req + return &requestCtx{ch: ch, seqNum: req.seqNum} } func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { - ch.lastSeqNum++ - ch.reqChan <- &api.VppRequest{ - Message: msg, - Multipart: true, - SeqNum: ch.lastSeqNum, + req := &vppRequest{ + msg: msg, + seqNum: ch.nextSeqNum(), + multi: true, } - return &multiRequestCtxData{ch: ch, seqNum: ch.lastSeqNum} + ch.reqChan <- req + return &multiRequestCtx{ch: ch, seqNum: req.seqNum} } func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) { - subscription := &api.NotifSubscription{ + sub := &api.NotifSubscription{ NotifChan: notifChan, MsgFactory: msgFactory, } - ch.notifSubsChan <- &api.NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: true, + // TODO: get rid of notifSubsChan and notfSubsReplyChan, + // it's no longer need because we know all message IDs and can store subscription right here + ch.notifSubsChan <- &subscriptionRequest{ + sub: sub, + subscribe: true, } - return subscription, <-ch.notifSubsReplyChan + return sub, <-ch.notifSubsReplyChan } func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error { - ch.notifSubsChan <- &api.NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: false, + ch.notifSubsChan <- &subscriptionRequest{ + sub: subscription, + subscribe: false, } return <-ch.notifSubsReplyChan } -func (ch *channel) CheckMessageCompatibility(messages ...api.Message) error { - for _, msg := range messages { - _, err := ch.msgIdentifier.GetMessageID(msg) - if err != nil { - return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - } - } - return nil -} - func (ch *channel) SetReplyTimeout(timeout time.Duration) { ch.replyTimeout = timeout } -func (ch *channel) GetRequestChannel() chan<- *api.VppRequest { - return ch.reqChan -} - -func (ch *channel) GetReplyChannel() <-chan *api.VppReply { - return ch.replyChan -} - -func (ch *channel) GetNotificationChannel() chan<- *api.NotifSubscribeRequest { - return ch.notifSubsChan -} - -func (ch *channel) GetNotificationReplyChannel() <-chan error { - return ch.notifSubsReplyChan -} - -func (ch *channel) GetMessageDecoder() api.MessageDecoder { - return ch.msgDecoder -} - -func (ch *channel) GetID() uint16 { - return ch.id -} - func (ch *channel) Close() { if ch.reqChan != nil { close(ch.reqChan) @@ -172,9 +176,8 @@ func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last return false, errors.New("nil message passed in") } - if ch.delayedReply != nil { + if vppReply := ch.delayedReply; vppReply != nil { // try the delayed reply - vppReply := ch.delayedReply ch.delayedReply = nil ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) if !ignore { @@ -201,12 +204,12 @@ func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last return } -func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { +func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { // check the sequence number - cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) + cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum) if cmpSeqNums == -1 { // reply received too late, ignore the message - logrus.WithField("sequence-number", reply.SeqNum).Warn( + logrus.WithField("sequence-number", reply.seqNum).Warn( "Received reply to an already closed binary API request") ignore = true return @@ -217,11 +220,11 @@ func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.M return } - if reply.Error != nil { - err = reply.Error + if reply.err != nil { + err = reply.err return } - if reply.LastReplyReceived { + if reply.lastReceived { lastReplyReceived = true return } @@ -235,42 +238,34 @@ func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.M return } - if reply.MessageID != expMsgID { + if reply.msgID != expMsgID { var msgNameCrc string - if nameCrc, err := ch.msgIdentifier.LookupByID(reply.MessageID); err != nil { + if replyMsg, err := ch.msgIdentifier.LookupByID(reply.msgID); err != nil { msgNameCrc = err.Error() } else { - msgNameCrc = nameCrc + msgNameCrc = getMsgNameWithCrc(replyMsg) } - err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+ + err = fmt.Errorf("received invalid message ID (seqNum=%d), expected %d (%s), but got %d (%s) "+ "(check if multiple goroutines are not sharing single GoVPP channel)", - reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) + reply.seqNum, expMsgID, msg.GetMessageName(), reply.msgID, msgNameCrc) return } // decode the message - err = ch.msgDecoder.DecodeMsg(reply.Data, msg) - return -} - -// compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to, -// or succeeds seq. number <seqNum2>. -// Since sequence numbers cycle in the finite set of size 2^16, the function -// must assume that the distance between compared sequence numbers is less than -// (2^16)/2 to determine the order. -func compareSeqNumbers(seqNum1, seqNum2 uint16) int { - // calculate distance from seqNum1 to seqNum2 - var dist uint16 - if seqNum1 <= seqNum2 { - dist = seqNum2 - seqNum1 - } else { - dist = 0xffff - (seqNum1 - seqNum2 - 1) + if err = ch.msgDecoder.DecodeMsg(reply.data, msg); err != nil { + return } - if dist == 0 { - return 0 - } else if dist <= 0x8000 { - return -1 + + // check Retval and convert it into VnetAPIError error + if strings.HasSuffix(msg.GetMessageName(), "_reply") { + // TODO: use categories for messages to avoid checking message name + if f := reflect.Indirect(reflect.ValueOf(msg)).FieldByName("Retval"); f.IsValid() { + if retval := f.Int(); retval != 0 { + err = api.VPPApiError(retval) + } + } } - return 1 + + return } |