aboutsummaryrefslogtreecommitdiffstats
path: root/core/notification_handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/notification_handler.go')
-rw-r--r--core/notification_handler.go67
1 files changed, 28 insertions, 39 deletions
diff --git a/core/notification_handler.go b/core/notification_handler.go
index c0e8687..7b889e3 100644
--- a/core/notification_handler.go
+++ b/core/notification_handler.go
@@ -16,21 +16,20 @@ package core
import (
"fmt"
- "reflect"
"git.fd.io/govpp.git/api"
logger "github.com/sirupsen/logrus"
)
-// processNotifSubscribeRequest processes a notification subscribe request.
-func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSubscribeRequest) error {
+// processSubscriptionRequest processes a notification subscribe request.
+func (c *Connection) processSubscriptionRequest(ch *channel, req *subscriptionRequest) error {
var err error
// subscribe / unsubscribe
- if req.Subscribe {
- err = c.addNotifSubscription(req.Subscription)
+ if req.subscribe {
+ err = c.addNotifSubscription(req.sub)
} else {
- err = c.removeNotifSubscription(req.Subscription)
+ err = c.removeNotifSubscription(req.sub)
}
// send the reply into the go channel
@@ -40,7 +39,7 @@ func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSub
default:
// unable to write into the channel without blocking
log.WithFields(logger.Fields{
- "channel": ch,
+ "channel": ch.id,
}).Warn("Unable to deliver the subscribe reply, reciever end not ready.")
}
@@ -50,14 +49,14 @@ func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSub
// addNotifSubscription adds the notification subscription into the subscriptions map of the connection.
func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error {
// get message ID of the notification message
- msgID, err := c.getSubscriptionMessageID(subs)
+ msgID, msgName, err := c.getSubscriptionMessageID(subs)
if err != nil {
return err
}
log.WithFields(logger.Fields{
- "msg_id": msgID,
- "subscription": subs,
+ "msg_name": msgName,
+ "msg_id": msgID,
}).Debug("Adding new notification subscription.")
// add the subscription into map
@@ -72,14 +71,14 @@ func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error {
// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection.
func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error {
// get message ID of the notification message
- msgID, err := c.getSubscriptionMessageID(subs)
+ msgID, msgName, err := c.getSubscriptionMessageID(subs)
if err != nil {
return err
}
log.WithFields(logger.Fields{
- "msg_id": msgID,
- "subscription": subs,
+ "msg_name": msgName,
+ "msg_id": msgID,
}).Debug("Removing notification subscription.")
// remove the subscription from the map
@@ -115,31 +114,22 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) {
// send to notification to each subscriber
for _, subs := range c.notifSubscriptions[msgID] {
+ msg := subs.MsgFactory()
log.WithFields(logger.Fields{
- "msg_id": msgID,
- "msg_size": len(data),
- "subscription": subs,
+ "msg_name": msg.GetMessageName(),
+ "msg_id": msgID,
+ "msg_size": len(data),
}).Debug("Sending a notification to the subscription channel.")
- msg := subs.MsgFactory()
- err := c.codec.DecodeMsg(data, msg)
- if err != nil {
+ if err := c.codec.DecodeMsg(data, msg); err != nil {
log.WithFields(logger.Fields{
- "msg_id": msgID,
- "msg_size": len(data),
- "subscription": subs,
- }).Error("Unable to decode the notification message.")
+ "msg_name": msg.GetMessageName(),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Errorf("Unable to decode the notification message: %v", err)
continue
}
- // special case for the strange interface counters message
- if msg.GetMessageName() == "vnet_interface_counters" {
- v := reflect.ValueOf(msg).Elem().FieldByName("Data")
- if v.IsValid() {
- v.SetBytes(data[8:]) // include the Count and Data fields in the data
- }
- }
-
// send the message into the go channel of the subscription
select {
case subs.NotifChan <- msg:
@@ -147,9 +137,9 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) {
default:
// unable to write into the channel without blocking
log.WithFields(logger.Fields{
- "msg_id": msgID,
- "msg_size": len(data),
- "subscription": subs,
+ "msg_name": msg.GetMessageName(),
+ "msg_id": msgID,
+ "msg_size": len(data),
}).Warn("Unable to deliver the notification, reciever end not ready.")
}
@@ -160,22 +150,21 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) {
log.WithFields(logger.Fields{
"msg_id": msgID,
"msg_size": len(data),
- }).Debug("No subscription found for the notification message.")
+ }).Info("No subscription found for the notification message.")
}
}
// getSubscriptionMessageID returns ID of the message the subscription is tied to.
-func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) {
+func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, string, error) {
msg := subs.MsgFactory()
msgID, err := c.GetMessageID(msg)
-
if err != nil {
log.WithFields(logger.Fields{
"msg_name": msg.GetMessageName(),
"msg_crc": msg.GetCrcString(),
}).Errorf("unable to retrieve message ID: %v", err)
- return 0, fmt.Errorf("unable to retrieve message ID: %v", err)
+ return 0, "", fmt.Errorf("unable to retrieve message ID: %v", err)
}
- return msgID, nil
+ return msgID, msg.GetMessageName(), nil
}