diff options
author | 2017-05-25 13:47:43 +0200 | |
---|---|---|
committer | 2017-05-25 13:54:13 +0200 | |
commit | 2d07847237e754d9050f06f565baa430c70ed937 (patch) | |
tree | 80588aeec912e95fa21b51520bbd527eb87f455b /core/notifications.go | |
parent | c38cb25d746736f062ee16e87f553c8a4ec5fced (diff) |
added async connect API, new structure of examples
Change-Id: Iab9bce174596c30998981e02b7030c248c423384
Signed-off-by: Rastislav Szabo <raszabo@cisco.com>
Diffstat (limited to 'core/notifications.go')
-rw-r--r-- | core/notifications.go | 182 |
1 files changed, 0 insertions, 182 deletions
diff --git a/core/notifications.go b/core/notifications.go deleted file mode 100644 index 5cd132c..0000000 --- a/core/notifications.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "fmt" - "reflect" - - logger "github.com/Sirupsen/logrus" - - "git.fd.io/govpp.git/api" -) - -// processNotifSubscribeRequest processes a notification subscribe request. -func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error { - var err error - - // subscribe / unsubscribe - if req.Subscribe { - err = c.addNotifSubscription(req.Subscription) - } else { - err = c.removeNotifSubscription(req.Subscription) - } - - // send the reply into the go channel - select { - case ch.NotifSubsReplyChan <- err: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch, - }).Warn("Unable to deliver the subscribe reply, reciever end not ready.") - } - - return err -} - -// addNotifSubscription adds the notification subscription into the subscriptions map of the connection. -func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_id": msgID, - "subscription": subs, - }).Debug("Adding new notification subscription.") - - // add the subscription into map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs) - - return nil -} - -// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection. -func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_id": msgID, - "subscription": subs, - }).Debug("Removing notification subscription.") - - // remove the subscription from the map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - for i, item := range c.notifSubscriptions[msgID] { - if item == subs { - // remove i-th item in the slice - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...) - break - } - } - - return nil -} - -// isNotificationMessage returns true if someone has subscribed to provided message ID. -func (c *Connection) isNotificationMessage(msgID uint16) bool { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - _, exists := c.notifSubscriptions[msgID] - return exists -} - -// sendNotifications send a notification message to all subscribers subscribed for that message. -func (c *Connection) sendNotifications(msgID uint16, data []byte) { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - matched := false - - // send to notification to each subscriber - for _, subs := range c.notifSubscriptions[msgID] { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Debug("Sending a notification to the subscription channel.") - - msg := subs.MsgFactory() - err := c.codec.DecodeMsg(data, msg) - if err != nil { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Error("Unable to decode the notification message.") - continue - } - - // special case for the strange interface counters message - if msg.GetMessageName() == "vnet_interface_counters" { - v := reflect.ValueOf(msg).Elem().FieldByName("Data") - if v.IsValid() { - v.SetBytes(data[8:]) // include the Count and Data fields in the data - } - } - - // send the message into the go channel of the subscription - select { - case subs.NotifChan <- msg: - // message sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Warn("Unable to deliver the notification, reciever end not ready.") - } - - matched = true - } - - if !matched { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - }).Debug("No subscription found for the notification message.") - } -} - -// getSubscriptionMessageID returns ID of the message the subscription is tied to. -func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) { - msg := subs.MsgFactory() - msgID, err := c.GetMessageID(msg) - - if err != nil { - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_crc": msg.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - return 0, fmt.Errorf("unable to retrieve message ID: %v", err) - } - - return msgID, nil -} |