summaryrefslogtreecommitdiffstats
path: root/core/request_handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/request_handler.go')
-rw-r--r--core/request_handler.go81
1 files changed, 65 insertions, 16 deletions
diff --git a/core/request_handler.go b/core/request_handler.go
index fd6d100..14c095d 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -29,7 +29,7 @@ var (
)
// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *channel) {
+func (c *Connection) watchRequests(ch *Channel) {
for {
select {
case req, ok := <-ch.reqChan:
@@ -40,16 +40,12 @@ func (c *Connection) watchRequests(ch *channel) {
return
}
c.processRequest(ch, req)
-
- case req := <-ch.notifSubsChan:
- // new request on the notification subscribe channel
- c.processSubscriptionRequest(ch, req)
}
}
}
// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
+func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
// check whether we are connected to VPP
if atomic.LoadUint32(&c.connected) == 0 {
err := ErrNotConnected
@@ -137,7 +133,7 @@ func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
}
// msgCallback is called whenever any binary API message comes from VPP.
-func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
+func (c *Connection) msgCallback(msgID uint16, data []byte) {
connLock.RLock()
defer connLock.RUnlock()
@@ -157,13 +153,8 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
// - replies that don't have context as first field (comes as zero)
// - events that don't have context at all (comes as non zero)
//
- msgContext, err := c.codec.DecodeMsgContext(data, msg)
- if err == nil {
- if context != msgContext {
- log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext)
- context = msgContext
- }
- } else {
+ context, err := c.codec.DecodeMsgContext(data, msg)
+ if err != nil {
log.Errorf("decoding context failed: %v", err)
}
@@ -218,7 +209,7 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
// it logs the error and do not send the message.
-func sendReply(ch *channel, reply *vppReply) {
+func sendReply(ch *Channel, reply *vppReply) {
select {
case ch.replyChan <- reply:
// reply sent successfully
@@ -232,10 +223,68 @@ func sendReply(ch *channel, reply *vppReply) {
}
}
-func sendReplyError(ch *channel, req *vppRequest, err error) {
+func sendReplyError(ch *Channel, req *vppRequest, err error) {
sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
}
+// isNotificationMessage returns true if someone has subscribed to provided message ID.
+func (c *Connection) isNotificationMessage(msgID uint16) bool {
+ c.subscriptionsLock.RLock()
+ defer c.subscriptionsLock.RUnlock()
+
+ _, exists := c.subscriptions[msgID]
+ return exists
+}
+
+// sendNotifications send a notification message to all subscribers subscribed for that message.
+func (c *Connection) sendNotifications(msgID uint16, data []byte) {
+ c.subscriptionsLock.RLock()
+ defer c.subscriptionsLock.RUnlock()
+
+ matched := false
+
+ // send to notification to each subscriber
+ for _, sub := range c.subscriptions[msgID] {
+ log.WithFields(logger.Fields{
+ "msg_name": sub.event.GetMessageName(),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Debug("Sending a notification to the subscription channel.")
+
+ event := sub.msgFactory()
+ if err := c.codec.DecodeMsg(data, event); err != nil {
+ log.WithFields(logger.Fields{
+ "msg_name": sub.event.GetMessageName(),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Errorf("Unable to decode the notification message: %v", err)
+ continue
+ }
+
+ // send the message into the go channel of the subscription
+ select {
+ case sub.notifChan <- event:
+ // message sent successfully
+ default:
+ // unable to write into the channel without blocking
+ log.WithFields(logger.Fields{
+ "msg_name": sub.event.GetMessageName(),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Warn("Unable to deliver the notification, reciever end not ready.")
+ }
+
+ matched = true
+ }
+
+ if !matched {
+ log.WithFields(logger.Fields{
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Info("No subscription found for the notification message.")
+ }
+}
+
// +------------------+-------------------+-----------------------+
// | 15b = channel ID | 1b = is multipart | 16b = sequence number |
// +------------------+-------------------+-----------------------+