From 2d07847237e754d9050f06f565baa430c70ed937 Mon Sep 17 00:00:00 2001 From: Rastislav Szabo Date: Thu, 25 May 2017 13:47:43 +0200 Subject: added async connect API, new structure of examples Change-Id: Iab9bce174596c30998981e02b7030c248c423384 Signed-off-by: Rastislav Szabo --- core/core.go | 365 +++++++++++++++++++------------------------ core/core_test.go | 16 ++ core/notification_handler.go | 182 +++++++++++++++++++++ core/notifications.go | 182 --------------------- core/request_handler.go | 213 +++++++++++++++++++++++++ 5 files changed, 575 insertions(+), 383 deletions(-) create mode 100644 core/notification_handler.go delete mode 100644 core/notifications.go create mode 100644 core/request_handler.go (limited to 'core') diff --git a/core/core.go b/core/core.go index 5b14a42..2484c81 100644 --- a/core/core.go +++ b/core/core.go @@ -14,14 +14,12 @@ package core -//go:generate binapi-generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api - import ( "errors" - "fmt" "os" "sync" "sync/atomic" + "time" logger "github.com/Sirupsen/logrus" @@ -31,14 +29,41 @@ import ( ) const ( - requestChannelBufSize = 100 // default size of the request channel buffers - replyChannelBufSize = 100 // default size of the reply channel buffers + requestChannelBufSize = 100 // default size of the request channel buffers + replyChannelBufSize = 100 // default size of the reply channel buffers + notificationChannelBufSize = 100 // default size of the notification channel buffers +) + +const ( + healthCheckProbeInterval = time.Second * 1 // default health check probe interval + healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe +) + +// ConnectionState holds the current state of the connection to VPP. +type ConnectionState int + +const ( + // Connected connection state means that the connection to VPP has been successfully established. + Connected ConnectionState = iota + + // Disconnected connection state means that the connection to VPP has been lost. + Disconnected = iota ) +// ConnectionEvent is a notification about change in the VPP connection state. +type ConnectionEvent struct { + // Timestamp holds the time when the event has been generated. + Timestamp time.Time + + // State holds the new state of the connection to VPP at the time when the event has been generated. + State ConnectionState +} + // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { - vpp adapter.VppAdapter // VPP adapter - codec *MsgCodec // message codec + vpp adapter.VppAdapter // VPP adapter + connected uint32 // non-zero if the adapter is connected to VPP + codec *MsgCodec // message codec msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgIDsLock sync.RWMutex // lock for the message IDs map @@ -79,7 +104,57 @@ func SetLogger(l *logger.Logger) { } // Connect connects to VPP using specified VPP adapter and returns the connection handle. +// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, err + } + + // blocking attempt to connect to VPP + err = c.connectVPP() + if err != nil { + return nil, err + } + + return conn, nil +} + +// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle +// and ConnectionState channel. This call does not block until connection is established, it +// returns immediately. The caller is supposed to watch the returned ConnectionState channel for +// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. +func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, nil, err + } + + // asynchronously attempt to connect to VPP + connChan := make(chan ConnectionEvent, notificationChannelBufSize) + go c.connectLoop(connChan) + + return conn, connChan, nil +} + +// Disconnect disconnects from VPP and releases all connection-related resources. +func (c *Connection) Disconnect() { + if c == nil { + return + } + connLock.Lock() + defer connLock.Unlock() + + if c != nil && c.vpp != nil { + c.disconnectVPP() + } + conn = nil +} + +// newConnection returns new connection handle. +func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { connLock.Lock() defer connLock.Unlock() @@ -93,35 +168,102 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription) conn.vpp.SetMsgCallback(msgCallback) + return conn, nil +} - logger.Debug("Connecting to VPP...") +// connectVPP performs one blocking attempt to connect to VPP. +func (c *Connection) connectVPP() error { + log.Debug("Connecting to VPP...") - err := conn.vpp.Connect() + // blocking connect + err := c.vpp.Connect() if err != nil { - return nil, err + log.Warn(err) + return err } + // store connected state + atomic.StoreUint32(&c.connected, 1) + // store control ping IDs - conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{}) - conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{}) + c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{}) + c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{}) - logger.Debug("VPP connected.") + log.Info("Connected to VPP.") + return nil +} - return conn, nil +// disconnectVPP disconnects from VPP in case it is connected. +func (c *Connection) disconnectVPP() { + if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { + c.vpp.Disconnect() + } } -// Disconnect disconnects from VPP. -func (c *Connection) Disconnect() { - if c == nil { +// connectLoop attempts to connect to VPP until it succeeds. +// Then it continues with healthCheckLoop. +func (c *Connection) connectLoop(connChan chan ConnectionEvent) { + // loop until connected + for { + err := c.connectVPP() + if err == nil { + // signal connected event + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + break + } + } + + // we are now connected, continue with health check loop + c.healthCheckLoop(connChan) +} + +// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, +// it continues with connectLoop and tries to reconnect. +func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { + // create a separate API channel for health check probes + ch, err := conn.NewAPIChannel() + if err != nil { + log.Error("Error by creating health check API channel, health check will be disabled:", err) return } - connLock.Lock() - defer connLock.Unlock() - if c != nil && c.vpp != nil { - c.vpp.Disconnect() + // send health check probes until an error occurs + for { + // wait for healthCheckProbeInterval + <-time.After(healthCheckProbeInterval) + + if atomic.LoadUint32(&c.connected) == 0 { + // Disconnect has been called in the meantime, return the healthcheck - reconnect loop + log.Debug("Disconnected on request, exiting health check loop.") + return + } + + // send the control ping + ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} + + // expect response within timeout period + select { + case vppReply := <-ch.ReplyChan: + err = vppReply.Error + case <-time.After(healthCheckReplyTimeout): + err = errors.New("probe reply not received within the timeout period") + } + + // in case of error, break & disconnect + if err != nil { + log.Errorf("VPP health check failed: %v", err) + // signal disconnected event via channel + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + break + } } - conn = nil + + // cleanup + ch.Close() + c.disconnectVPP() + + // we are now disconnected, start connect loop + c.connectLoop(connChan) } // NewAPIChannel returns a new API channel for communication with VPP via govpp core. @@ -163,84 +305,6 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) return ch, nil } -// watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { - for { - select { - case req, ok := <-ch.ReqChan: - // new request on the request channel - if !ok { - // after closing the request channel, release API channel and return - c.releaseAPIChannel(ch, chMeta) - return - } - c.processRequest(ch, chMeta, req) - - case req := <-ch.NotifSubsChan: - // new request on the notification subscribe channel - c.processNotifSubscribeRequest(ch, req) - } - } -} - -// processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { - // retrieve message ID - msgID, err := c.GetMessageID(req.Message) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": req.Message.GetMessageName(), - "msg_crc": req.Message.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // encode the message into binary - data, err := c.codec.EncodeMsg(req.Message, msgID) - if err != nil { - error := fmt.Errorf("unable to encode the messge: %v", err) - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - }).Errorf("%v", error) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // send the message - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Sending a message to VPP.") - - if req.Multipart { - // expect multipart response - atomic.StoreUint32(&chMeta.multipart, 1) - } - - // send the request to VPP - c.vpp.SendMsg(chMeta.id, data) - - if req.Multipart { - // send a control ping to determine end of the multipart response - ping := &vpe.ControlPing{} - pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID) - - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": c.pingReqID, - "msg_size": len(pingData), - }).Debug("Sending a control ping to VPP.") - - c.vpp.SendMsg(chMeta.id, pingData) - } - - return nil -} - // releaseAPIChannel releases API channel that needs to be closed. func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) { log.WithFields(logger.Fields{ @@ -252,104 +316,3 @@ func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) delete(c.channels, chMeta.id) c.channelsLock.Unlock() } - -// msgCallback is called whenever any binary API message comes from VPP. -func msgCallback(context uint32, msgID uint16, data []byte) { - connLock.RLock() - defer connLock.RUnlock() - - if conn == nil { - log.Warn("Already disconnected, ignoring the message.") - return - } - - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Received a message from VPP.") - - if context == 0 || conn.isNotificationMessage(msgID) { - // process the message as a notification - conn.sendNotifications(msgID, data) - return - } - - // match ch according to the context - conn.channelsLock.RLock() - ch, ok := conn.channels[context] - conn.channelsLock.RUnlock() - - if !ok { - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - }).Error("Context ID not known, ignoring the message.") - return - } - - chMeta := ch.Metadata().(*channelMetadata) - lastReplyReceived := false - // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply - if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { - lastReplyReceived = true - } - - // send the data to the channel - sendReply(ch, &api.VppReply{ - MessageID: msgID, - Data: data, - LastReplyReceived: lastReplyReceived, - }) -} - -// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise -// it logs the error and do not send the message. -func sendReply(ch *api.Channel, reply *api.VppReply) { - select { - case ch.ReplyChan <- reply: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch, - "msg_id": reply.MessageID, - }).Warn("Unable to send the reply, reciever end not ready.") - } -} - -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString()) -} - -// messageNameToID returns message ID of a message identified by its name and CRC. -func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) { - // try to get the ID from the map - c.msgIDsLock.RLock() - id, ok := c.msgIDs[msgName+msgCrc] - c.msgIDsLock.RUnlock() - if ok { - return id, nil - } - - // get the ID using VPP API - id, err := c.vpp.GetMsgID(msgName, msgCrc) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_crc": msgCrc, - }).Errorf("unable to retrieve message ID: %v", err) - return id, error - } - - c.msgIDsLock.Lock() - c.msgIDs[msgName+msgCrc] = id - c.msgIDsLock.Unlock() - - return id, nil -} diff --git a/core/core_test.go b/core/core_test.go index d3c2e2c..3184ef5 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -145,6 +145,7 @@ func TestNotifications(t *testing.T) { } func TestNilConnection(t *testing.T) { + RegisterTestingT(t) var conn *Connection ch, err := conn.NewAPIChannel() @@ -168,6 +169,21 @@ func TestDoubleConnection(t *testing.T) { Expect(conn).Should(BeNil()) } +func TestAsyncConnection(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.conn.Disconnect() + conn, ch, err := AsyncConnect(ctx.mockVpp) + ctx.conn = conn + + Expect(err).ShouldNot(HaveOccurred()) + Expect(conn).ShouldNot(BeNil()) + + ev := <-ch + Expect(ev.State).Should(BeEquivalentTo(Connected)) +} + func TestFullBuffer(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() diff --git a/core/notification_handler.go b/core/notification_handler.go new file mode 100644 index 0000000..5cd132c --- /dev/null +++ b/core/notification_handler.go @@ -0,0 +1,182 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "fmt" + "reflect" + + logger "github.com/Sirupsen/logrus" + + "git.fd.io/govpp.git/api" +) + +// processNotifSubscribeRequest processes a notification subscribe request. +func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error { + var err error + + // subscribe / unsubscribe + if req.Subscribe { + err = c.addNotifSubscription(req.Subscription) + } else { + err = c.removeNotifSubscription(req.Subscription) + } + + // send the reply into the go channel + select { + case ch.NotifSubsReplyChan <- err: + // reply sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "channel": ch, + }).Warn("Unable to deliver the subscribe reply, reciever end not ready.") + } + + return err +} + +// addNotifSubscription adds the notification subscription into the subscriptions map of the connection. +func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { + // get message ID of the notification message + msgID, err := c.getSubscriptionMessageID(subs) + if err != nil { + return err + } + + log.WithFields(logger.Fields{ + "msg_id": msgID, + "subscription": subs, + }).Debug("Adding new notification subscription.") + + // add the subscription into map + c.notifSubscriptionsLock.Lock() + defer c.notifSubscriptionsLock.Unlock() + + c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs) + + return nil +} + +// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection. +func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error { + // get message ID of the notification message + msgID, err := c.getSubscriptionMessageID(subs) + if err != nil { + return err + } + + log.WithFields(logger.Fields{ + "msg_id": msgID, + "subscription": subs, + }).Debug("Removing notification subscription.") + + // remove the subscription from the map + c.notifSubscriptionsLock.Lock() + defer c.notifSubscriptionsLock.Unlock() + + for i, item := range c.notifSubscriptions[msgID] { + if item == subs { + // remove i-th item in the slice + c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...) + break + } + } + + return nil +} + +// isNotificationMessage returns true if someone has subscribed to provided message ID. +func (c *Connection) isNotificationMessage(msgID uint16) bool { + c.notifSubscriptionsLock.RLock() + defer c.notifSubscriptionsLock.RUnlock() + + _, exists := c.notifSubscriptions[msgID] + return exists +} + +// sendNotifications send a notification message to all subscribers subscribed for that message. +func (c *Connection) sendNotifications(msgID uint16, data []byte) { + c.notifSubscriptionsLock.RLock() + defer c.notifSubscriptionsLock.RUnlock() + + matched := false + + // send to notification to each subscriber + for _, subs := range c.notifSubscriptions[msgID] { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + "subscription": subs, + }).Debug("Sending a notification to the subscription channel.") + + msg := subs.MsgFactory() + err := c.codec.DecodeMsg(data, msg) + if err != nil { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + "subscription": subs, + }).Error("Unable to decode the notification message.") + continue + } + + // special case for the strange interface counters message + if msg.GetMessageName() == "vnet_interface_counters" { + v := reflect.ValueOf(msg).Elem().FieldByName("Data") + if v.IsValid() { + v.SetBytes(data[8:]) // include the Count and Data fields in the data + } + } + + // send the message into the go channel of the subscription + select { + case subs.NotifChan <- msg: + // message sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + "subscription": subs, + }).Warn("Unable to deliver the notification, reciever end not ready.") + } + + matched = true + } + + if !matched { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + }).Debug("No subscription found for the notification message.") + } +} + +// getSubscriptionMessageID returns ID of the message the subscription is tied to. +func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) { + msg := subs.MsgFactory() + msgID, err := c.GetMessageID(msg) + + if err != nil { + log.WithFields(logger.Fields{ + "msg_name": msg.GetMessageName(), + "msg_crc": msg.GetCrcString(), + }).Errorf("unable to retrieve message ID: %v", err) + return 0, fmt.Errorf("unable to retrieve message ID: %v", err) + } + + return msgID, nil +} diff --git a/core/notifications.go b/core/notifications.go deleted file mode 100644 index 5cd132c..0000000 --- a/core/notifications.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "fmt" - "reflect" - - logger "github.com/Sirupsen/logrus" - - "git.fd.io/govpp.git/api" -) - -// processNotifSubscribeRequest processes a notification subscribe request. -func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error { - var err error - - // subscribe / unsubscribe - if req.Subscribe { - err = c.addNotifSubscription(req.Subscription) - } else { - err = c.removeNotifSubscription(req.Subscription) - } - - // send the reply into the go channel - select { - case ch.NotifSubsReplyChan <- err: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch, - }).Warn("Unable to deliver the subscribe reply, reciever end not ready.") - } - - return err -} - -// addNotifSubscription adds the notification subscription into the subscriptions map of the connection. -func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_id": msgID, - "subscription": subs, - }).Debug("Adding new notification subscription.") - - // add the subscription into map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs) - - return nil -} - -// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection. -func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_id": msgID, - "subscription": subs, - }).Debug("Removing notification subscription.") - - // remove the subscription from the map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - for i, item := range c.notifSubscriptions[msgID] { - if item == subs { - // remove i-th item in the slice - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...) - break - } - } - - return nil -} - -// isNotificationMessage returns true if someone has subscribed to provided message ID. -func (c *Connection) isNotificationMessage(msgID uint16) bool { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - _, exists := c.notifSubscriptions[msgID] - return exists -} - -// sendNotifications send a notification message to all subscribers subscribed for that message. -func (c *Connection) sendNotifications(msgID uint16, data []byte) { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - matched := false - - // send to notification to each subscriber - for _, subs := range c.notifSubscriptions[msgID] { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Debug("Sending a notification to the subscription channel.") - - msg := subs.MsgFactory() - err := c.codec.DecodeMsg(data, msg) - if err != nil { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Error("Unable to decode the notification message.") - continue - } - - // special case for the strange interface counters message - if msg.GetMessageName() == "vnet_interface_counters" { - v := reflect.ValueOf(msg).Elem().FieldByName("Data") - if v.IsValid() { - v.SetBytes(data[8:]) // include the Count and Data fields in the data - } - } - - // send the message into the go channel of the subscription - select { - case subs.NotifChan <- msg: - // message sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Warn("Unable to deliver the notification, reciever end not ready.") - } - - matched = true - } - - if !matched { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - }).Debug("No subscription found for the notification message.") - } -} - -// getSubscriptionMessageID returns ID of the message the subscription is tied to. -func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) { - msg := subs.MsgFactory() - msgID, err := c.GetMessageID(msg) - - if err != nil { - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_crc": msg.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - return 0, fmt.Errorf("unable to retrieve message ID: %v", err) - } - - return msgID, nil -} diff --git a/core/request_handler.go b/core/request_handler.go new file mode 100644 index 0000000..f4f5e92 --- /dev/null +++ b/core/request_handler.go @@ -0,0 +1,213 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "errors" + "fmt" + "sync/atomic" + + logger "github.com/Sirupsen/logrus" + + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/core/bin_api/vpe" +) + +// watchRequests watches for requests on the request API channel and forwards them as messages to VPP. +func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { + for { + select { + case req, ok := <-ch.ReqChan: + // new request on the request channel + if !ok { + // after closing the request channel, release API channel and return + c.releaseAPIChannel(ch, chMeta) + return + } + c.processRequest(ch, chMeta, req) + + case req := <-ch.NotifSubsChan: + // new request on the notification subscribe channel + c.processNotifSubscribeRequest(ch, req) + } + } +} + +// processRequest processes a single request received on the request channel. +func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { + // check whether we are connected to VPP + if atomic.LoadUint32(&c.connected) == 0 { + error := errors.New("not connected to VPP, ignoring the request") + log.Error(error) + sendReply(ch, &api.VppReply{Error: error}) + return error + } + + // retrieve message ID + msgID, err := c.GetMessageID(req.Message) + if err != nil { + error := fmt.Errorf("unable to retrieve message ID: %v", err) + log.WithFields(logger.Fields{ + "msg_name": req.Message.GetMessageName(), + "msg_crc": req.Message.GetCrcString(), + }).Error(err) + sendReply(ch, &api.VppReply{Error: error}) + return error + } + + // encode the message into binary + data, err := c.codec.EncodeMsg(req.Message, msgID) + if err != nil { + error := fmt.Errorf("unable to encode the messge: %v", err) + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": msgID, + }).Error(error) + sendReply(ch, &api.VppReply{Error: error}) + return error + } + + // send the message + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Sending a message to VPP.") + + if req.Multipart { + // expect multipart response + atomic.StoreUint32(&chMeta.multipart, 1) + } + + // send the request to VPP + c.vpp.SendMsg(chMeta.id, data) + + if req.Multipart { + // send a control ping to determine end of the multipart response + ping := &vpe.ControlPing{} + pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID) + + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": c.pingReqID, + "msg_size": len(pingData), + }).Debug("Sending a control ping to VPP.") + + c.vpp.SendMsg(chMeta.id, pingData) + } + + return nil +} + +// msgCallback is called whenever any binary API message comes from VPP. +func msgCallback(context uint32, msgID uint16, data []byte) { + connLock.RLock() + defer connLock.RUnlock() + + if conn == nil { + log.Warn("Already disconnected, ignoring the message.") + return + } + + log.WithFields(logger.Fields{ + "context": context, + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Received a message from VPP.") + + if context == 0 || conn.isNotificationMessage(msgID) { + // process the message as a notification + conn.sendNotifications(msgID, data) + return + } + + // match ch according to the context + conn.channelsLock.RLock() + ch, ok := conn.channels[context] + conn.channelsLock.RUnlock() + + if !ok { + log.WithFields(logger.Fields{ + "context": context, + "msg_id": msgID, + }).Error("Context ID not known, ignoring the message.") + return + } + + chMeta := ch.Metadata().(*channelMetadata) + lastReplyReceived := false + // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply + if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { + lastReplyReceived = true + } + + // send the data to the channel + sendReply(ch, &api.VppReply{ + MessageID: msgID, + Data: data, + LastReplyReceived: lastReplyReceived, + }) +} + +// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise +// it logs the error and do not send the message. +func sendReply(ch *api.Channel, reply *api.VppReply) { + select { + case ch.ReplyChan <- reply: + // reply sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "channel": ch, + "msg_id": reply.MessageID, + }).Warn("Unable to send the reply, reciever end not ready.") + } +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { + if c == nil { + return 0, errors.New("nil connection passed in") + } + return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString()) +} + +// messageNameToID returns message ID of a message identified by its name and CRC. +func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) { + // try to get the ID from the map + c.msgIDsLock.RLock() + id, ok := c.msgIDs[msgName+msgCrc] + c.msgIDsLock.RUnlock() + if ok { + return id, nil + } + + // get the ID using VPP API + id, err := c.vpp.GetMsgID(msgName, msgCrc) + if err != nil { + error := fmt.Errorf("unable to retrieve message ID: %v", err) + log.WithFields(logger.Fields{ + "msg_name": msgName, + "msg_crc": msgCrc, + }).Errorf("unable to retrieve message ID: %v", err) + return id, error + } + + c.msgIDsLock.Lock() + c.msgIDs[msgName+msgCrc] = id + c.msgIDsLock.Unlock() + + return id, nil +} -- cgit 1.2.3-korg