From 6b350c65fe0ec845cecf58bfb41ffc63dc9c04f7 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Thu, 23 Aug 2018 22:51:56 +0200 Subject: Simplify subscribing to events and fix events - there is no need for sending subscription requests through channels, since all the messages are registered and no communication with VPP is needed Change-Id: Ibc29957be02a32e26309f66c369a071559b822a9 Signed-off-by: Ondrej Fabry --- core/channel.go | 218 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 140 insertions(+), 78 deletions(-) (limited to 'core/channel.go') diff --git a/core/channel.go b/core/channel.go index 718f89c..a7d95fe 100644 --- a/core/channel.go +++ b/core/channel.go @@ -29,40 +29,20 @@ var ( ErrInvalidRequestCtx = errors.New("invalid request context") ) -// requestCtx is a context for request with single reply -type requestCtx struct { - ch *channel - seqNum uint16 -} - -// multiRequestCtx is a context for request with multiple responses -type multiRequestCtx struct { - ch *channel - seqNum uint16 -} - -func (req *requestCtx) ReceiveReply(msg api.Message) error { - if req == nil || req.ch == nil { - return ErrInvalidRequestCtx - } - - lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) - if err != nil { - return err - } - if lastReplyReceived { - return errors.New("multipart reply recieved while a single reply expected") - } - - return nil +// MessageCodec provides functionality for decoding binary data to generated API messages. +type MessageCodec interface { + //EncodeMsg encodes message into binary data. + EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) + // DecodeMsg decodes binary-encoded data of a message into provided Message structure. + DecodeMsg(data []byte, msg api.Message) error } -func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { - if req == nil || req.ch == nil { - return false, ErrInvalidRequestCtx - } - - return req.ch.receiveReplyInternal(msg, req.seqNum) +// MessageIdentifier provides identification of generated API messages. +type MessageIdentifier interface { + // GetMessageID returns message identifier of given API message. + GetMessageID(msg api.Message) (uint16, error) + // LookupByID looks up message name and crc by ID + LookupByID(msgID uint16) (api.Message, error) } // vppRequest is a request that will be sent to VPP. @@ -81,27 +61,40 @@ type vppReply struct { err error // in case of error, data is nil and this member contains error } -// NotifSubscribeRequest is a request to subscribe for delivery of specific notification messages. -type subscriptionRequest struct { - sub *api.NotifSubscription // subscription details - subscribe bool // true if this is a request to subscribe +// requestCtx is a context for request with single reply +type requestCtx struct { + ch *Channel + seqNum uint16 +} + +// multiRequestCtx is a context for request with multiple responses +type multiRequestCtx struct { + ch *Channel + seqNum uint16 +} + +// subscriptionCtx is a context of subscription for delivery of specific notification messages. +type subscriptionCtx struct { + ch *Channel + notifChan chan api.Message // channel where notification messages will be delivered to + msgID uint16 // message ID for the subscribed event message + event api.Message // event message that this subscription is for + msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification } // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines // concurrently, otherwise the responses could mix! Use multiple channels instead. -type channel struct { - id uint16 // channel ID +type Channel struct { + id uint16 + conn *Connection reqChan chan *vppRequest // channel for sending the requests to VPP replyChan chan *vppReply // channel where VPP replies are delivered to - notifSubsChan chan *subscriptionRequest // channel for sending notification subscribe requests - notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to - - msgDecoder api.MessageDecoder // used to decode binary data to generated API messages - msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message + msgCodec MessageCodec // used to decode binary data to generated API messages + msgIdentifier MessageIdentifier // used to retrieve message ID of a message lastSeqNum uint16 // sequence number of the last sent request @@ -109,73 +102,142 @@ type channel struct { replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout } -func (ch *channel) GetID() uint16 { +func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel { + return &Channel{ + id: id, + conn: conn, + msgCodec: codec, + msgIdentifier: identifier, + reqChan: make(chan *vppRequest, reqSize), + replyChan: make(chan *vppReply, replySize), + replyTimeout: DefaultReplyTimeout, + } +} + +func (ch *Channel) GetID() uint16 { return ch.id } -func (ch *channel) nextSeqNum() uint16 { +func (ch *Channel) nextSeqNum() uint16 { ch.lastSeqNum++ return ch.lastSeqNum } -func (ch *channel) SendRequest(msg api.Message) api.RequestCtx { - req := &vppRequest{ +func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx { + seqNum := ch.nextSeqNum() + ch.reqChan <- &vppRequest{ msg: msg, - seqNum: ch.nextSeqNum(), + seqNum: seqNum, } - ch.reqChan <- req - return &requestCtx{ch: ch, seqNum: req.seqNum} + return &requestCtx{ch: ch, seqNum: seqNum} } -func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { - req := &vppRequest{ +func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { + seqNum := ch.nextSeqNum() + ch.reqChan <- &vppRequest{ msg: msg, - seqNum: ch.nextSeqNum(), + seqNum: seqNum, multi: true, } - ch.reqChan <- req - return &multiRequestCtx{ch: ch, seqNum: req.seqNum} + return &multiRequestCtx{ch: ch, seqNum: seqNum} } -func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) { - sub := &api.NotifSubscription{ - NotifChan: notifChan, - MsgFactory: msgFactory, +func getMsgFactory(msg api.Message) func() api.Message { + return func() api.Message { + return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) } - // TODO: get rid of notifSubsChan and notfSubsReplyChan, - // it's no longer need because we know all message IDs and can store subscription right here - ch.notifSubsChan <- &subscriptionRequest{ - sub: sub, - subscribe: true, - } - return sub, <-ch.notifSubsReplyChan } -func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error { - ch.notifSubsChan <- &subscriptionRequest{ - sub: subscription, - subscribe: false, +func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) { + msgID, err := ch.msgIdentifier.GetMessageID(event) + if err != nil { + log.WithFields(logrus.Fields{ + "msg_name": event.GetMessageName(), + "msg_crc": event.GetCrcString(), + }).Errorf("unable to retrieve message ID: %v", err) + return nil, fmt.Errorf("unable to retrieve event message ID: %v", err) + } + + sub := &subscriptionCtx{ + ch: ch, + notifChan: notifChan, + msgID: msgID, + event: event, + msgFactory: getMsgFactory(event), } - return <-ch.notifSubsReplyChan + + // add the subscription into map + ch.conn.subscriptionsLock.Lock() + defer ch.conn.subscriptionsLock.Unlock() + + ch.conn.subscriptions[msgID] = append(ch.conn.subscriptions[msgID], sub) + + return sub, nil } -func (ch *channel) SetReplyTimeout(timeout time.Duration) { +func (ch *Channel) SetReplyTimeout(timeout time.Duration) { ch.replyTimeout = timeout } -func (ch *channel) Close() { +func (ch *Channel) Close() { if ch.reqChan != nil { close(ch.reqChan) + ch.reqChan = nil + } +} + +func (req *requestCtx) ReceiveReply(msg api.Message) error { + if req == nil || req.ch == nil { + return ErrInvalidRequestCtx + } + + lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) + if err != nil { + return err + } else if lastReplyReceived { + return errors.New("multipart reply recieved while a single reply expected") + } + + return nil +} + +func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { + if req == nil || req.ch == nil { + return false, ErrInvalidRequestCtx } + + return req.ch.receiveReplyInternal(msg, req.seqNum) +} + +func (sub *subscriptionCtx) Unsubscribe() error { + log.WithFields(logrus.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": sub.msgID, + }).Debug("Removing notification subscription.") + + // remove the subscription from the map + sub.ch.conn.subscriptionsLock.Lock() + defer sub.ch.conn.subscriptionsLock.Unlock() + + for i, item := range sub.ch.conn.subscriptions[sub.msgID] { + if item == sub { + // remove i-th item in the slice + sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...) + return nil + } + } + + return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName()) } // receiveReplyInternal receives a reply from the reply channel into the provided msg structure. -func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) { - var ignore bool +func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) { if msg == nil { return false, errors.New("nil message passed in") } + var ignore bool + if vppReply := ch.delayedReply; vppReply != nil { // try the delayed reply ch.delayedReply = nil @@ -204,12 +266,12 @@ func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last return } -func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { +func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { // check the sequence number cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum) if cmpSeqNums == -1 { // reply received too late, ignore the message - logrus.WithField("sequence-number", reply.seqNum).Warn( + logrus.WithField("seqNum", reply.seqNum).Warn( "Received reply to an already closed binary API request") ignore = true return @@ -253,7 +315,7 @@ func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Messa } // decode the message - if err = ch.msgDecoder.DecodeMsg(reply.data, msg); err != nil { + if err = ch.msgCodec.DecodeMsg(reply.data, msg); err != nil { return } -- cgit 1.2.3-korg