diff options
Diffstat (limited to 'core/request_handler.go')
-rw-r--r-- | core/request_handler.go | 81 |
1 files changed, 65 insertions, 16 deletions
diff --git a/core/request_handler.go b/core/request_handler.go index fd6d100..14c095d 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -29,7 +29,7 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *channel) { +func (c *Connection) watchRequests(ch *Channel) { for { select { case req, ok := <-ch.reqChan: @@ -40,16 +40,12 @@ func (c *Connection) watchRequests(ch *channel) { return } c.processRequest(ch, req) - - case req := <-ch.notifSubsChan: - // new request on the notification subscribe channel - c.processSubscriptionRequest(ch, req) } } } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *channel, req *vppRequest) error { +func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected @@ -137,7 +133,7 @@ func (c *Connection) processRequest(ch *channel, req *vppRequest) error { } // msgCallback is called whenever any binary API message comes from VPP. -func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { +func (c *Connection) msgCallback(msgID uint16, data []byte) { connLock.RLock() defer connLock.RUnlock() @@ -157,13 +153,8 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { // - replies that don't have context as first field (comes as zero) // - events that don't have context at all (comes as non zero) // - msgContext, err := c.codec.DecodeMsgContext(data, msg) - if err == nil { - if context != msgContext { - log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext) - context = msgContext - } - } else { + context, err := c.codec.DecodeMsgContext(data, msg) + if err != nil { log.Errorf("decoding context failed: %v", err) } @@ -218,7 +209,7 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. -func sendReply(ch *channel, reply *vppReply) { +func sendReply(ch *Channel, reply *vppReply) { select { case ch.replyChan <- reply: // reply sent successfully @@ -232,10 +223,68 @@ func sendReply(ch *channel, reply *vppReply) { } } -func sendReplyError(ch *channel, req *vppRequest, err error) { +func sendReplyError(ch *Channel, req *vppRequest, err error) { sendReply(ch, &vppReply{seqNum: req.seqNum, err: err}) } +// isNotificationMessage returns true if someone has subscribed to provided message ID. +func (c *Connection) isNotificationMessage(msgID uint16) bool { + c.subscriptionsLock.RLock() + defer c.subscriptionsLock.RUnlock() + + _, exists := c.subscriptions[msgID] + return exists +} + +// sendNotifications send a notification message to all subscribers subscribed for that message. +func (c *Connection) sendNotifications(msgID uint16, data []byte) { + c.subscriptionsLock.RLock() + defer c.subscriptionsLock.RUnlock() + + matched := false + + // send to notification to each subscriber + for _, sub := range c.subscriptions[msgID] { + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Sending a notification to the subscription channel.") + + event := sub.msgFactory() + if err := c.codec.DecodeMsg(data, event); err != nil { + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Errorf("Unable to decode the notification message: %v", err) + continue + } + + // send the message into the go channel of the subscription + select { + case sub.notifChan <- event: + // message sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "msg_name": sub.event.GetMessageName(), + "msg_id": msgID, + "msg_size": len(data), + }).Warn("Unable to deliver the notification, reciever end not ready.") + } + + matched = true + } + + if !matched { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + }).Info("No subscription found for the notification message.") + } +} + // +------------------+-------------------+-----------------------+ // | 15b = channel ID | 1b = is multipart | 16b = sequence number | // +------------------+-------------------+-----------------------+ |