diff options
author | Rastislav Szabo <raszabo@cisco.com> | 2017-05-04 11:09:03 +0200 |
---|---|---|
committer | Rastislav Szabo <raszabo@cisco.com> | 2017-05-04 11:12:35 +0200 |
commit | a101d966133a70b8a76526be25070436d14fcf9f (patch) | |
tree | 75e2dbf20de615e58252b780b2ba5baae8fdcf82 /core/notifications.go | |
parent | a968ead74525125dff9ae90b1c9a9102e4327900 (diff) |
initial commit
Signed-off-by: Rastislav Szabo <raszabo@cisco.com>
Diffstat (limited to 'core/notifications.go')
-rw-r--r-- | core/notifications.go | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/core/notifications.go b/core/notifications.go new file mode 100644 index 0000000..b44eb1a --- /dev/null +++ b/core/notifications.go @@ -0,0 +1,182 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "fmt" + "reflect" + + logger "github.com/Sirupsen/logrus" + + "gerrit.fd.io/r/govpp/api" +) + +// processNotifSubscribeRequest processes a notification subscribe request. +func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error { + var err error + + // subscribe / unsubscribe + if req.Subscribe { + err = c.addNotifSubscription(req.Subscription) + } else { + err = c.removeNotifSubscription(req.Subscription) + } + + // send the reply into the go channel + select { + case ch.NotifSubsReplyChan <- err: + // reply sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "channel": ch, + }).Warn("Unable to deliver the subscribe reply, reciever end not ready.") + } + + return err +} + +// addNotifSubscription adds the notification subscription into the subscriptions map of the connection. +func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { + // get message ID of the notification message + msgID, err := c.getSubscriptionMessageID(subs) + if err != nil { + return err + } + + log.WithFields(logger.Fields{ + "msg_id": msgID, + "subscription": subs, + }).Debug("Adding new notification subscription.") + + // add the subscription into map + c.notifSubscriptionsLock.Lock() + defer c.notifSubscriptionsLock.Unlock() + + c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs) + + return nil +} + +// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection. +func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error { + // get message ID of the notification message + msgID, err := c.getSubscriptionMessageID(subs) + if err != nil { + return err + } + + log.WithFields(logger.Fields{ + "msg_id": msgID, + "subscription": subs, + }).Debug("Removing notification subscription.") + + // remove the subscription from the map + c.notifSubscriptionsLock.Lock() + defer c.notifSubscriptionsLock.Unlock() + + for i, item := range c.notifSubscriptions[msgID] { + if item == subs { + // remove i-th item in the slice + c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...) + break + } + } + + return nil +} + +// isNotificationMessage returns true if someone has subscribed to provided message ID. +func (c *Connection) isNotificationMessage(msgID uint16) bool { + c.notifSubscriptionsLock.RLock() + defer c.notifSubscriptionsLock.RUnlock() + + _, exists := c.notifSubscriptions[msgID] + return exists +} + +// sendNotifications send a notification message to all subscribers subscribed for that message. +func (c *Connection) sendNotifications(msgID uint16, data []byte) { + c.notifSubscriptionsLock.RLock() + defer c.notifSubscriptionsLock.RUnlock() + + matched := false + + // send to notification to each subscriber + for _, subs := range c.notifSubscriptions[msgID] { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + "subscription": subs, + }).Debug("Sending a notification to the subscription channel.") + + msg := subs.MsgFactory() + err := c.codec.DecodeMsg(data, msg) + if err != nil { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + "subscription": subs, + }).Error("Unable to decode the notification message.") + continue + } + + // special case for the strange interface counters message + if msg.GetMessageName() == "vnet_interface_counters" { + v := reflect.ValueOf(msg).Elem().FieldByName("Data") + if v.IsValid() { + v.SetBytes(data[8:]) // include the Count and Data fields in the data + } + } + + // send the message into the go channel of the subscription + select { + case subs.NotifChan <- msg: + // message sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + "subscription": subs, + }).Warn("Unable to deliver the notification, reciever end not ready.") + } + + matched = true + } + + if !matched { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + }).Debug("No subscription found for the notification message.") + } +} + +// getSubscriptionMessageID returns ID of the message the subscription is tied to. +func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) { + msg := subs.MsgFactory() + msgID, err := c.GetMessageID(msg) + + if err != nil { + log.WithFields(logger.Fields{ + "msg_name": msg.GetMessageName(), + "msg_crc": msg.GetCrcString(), + }).Errorf("unable to retrieve message ID: %v", err) + return 0, fmt.Errorf("unable to retrieve message ID: %v", err) + } + + return msgID, nil +} |