summaryrefslogtreecommitdiffstats
path: root/core/core.go
diff options
context:
space:
mode:
authorRastislav Szabo <raszabo@cisco.com>2017-05-04 11:09:03 +0200
committerRastislav Szabo <raszabo@cisco.com>2017-05-04 11:12:35 +0200
commita101d966133a70b8a76526be25070436d14fcf9f (patch)
tree75e2dbf20de615e58252b780b2ba5baae8fdcf82 /core/core.go
parenta968ead74525125dff9ae90b1c9a9102e4327900 (diff)
initial commit
Signed-off-by: Rastislav Szabo <raszabo@cisco.com>
Diffstat (limited to 'core/core.go')
-rw-r--r--core/core.go340
1 files changed, 340 insertions, 0 deletions
diff --git a/core/core.go b/core/core.go
new file mode 100644
index 0000000..e11a30f
--- /dev/null
+++ b/core/core.go
@@ -0,0 +1,340 @@
+// Copyright (c) 2017 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package core
+
+//go:generate binapi_generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "sync"
+ "sync/atomic"
+
+ logger "github.com/Sirupsen/logrus"
+
+ "gerrit.fd.io/r/govpp/adapter"
+ "gerrit.fd.io/r/govpp/api"
+ "gerrit.fd.io/r/govpp/core/bin_api/vpe"
+)
+
+const (
+ requestChannelBufSize = 100 // default size of the request channel buffers
+ replyChannelBufSize = 100 // default size of the reply channel buffers
+)
+
+// Connection represents a shared memory connection to VPP via vppAdapter.
+type Connection struct {
+ vpp adapter.VppAdapter // VPP adapter
+ codec *MsgCodec // message codec
+
+ msgIDs map[string]uint16 // map os message IDs indexed by message name + CRC
+ msgIDsLock sync.RWMutex // lock for the message IDs map
+
+ channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
+ channelsLock sync.RWMutex // lock for the channels map
+
+ notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
+ notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
+
+ maxChannelID uint32 // maximum used client ID
+ pingReqID uint16 // ID if the ControlPing message
+ pingReplyID uint16 // ID of the ControlPingReply message
+}
+
+// channelMetadata contains core-local metadata of an API channel.
+type channelMetadata struct {
+ id uint32 // channel ID
+ multipart uint32 // 1 if multipart request is being processed, 0 otherwise
+}
+
+var (
+ log *logger.Logger // global logger
+ conn *Connection // global handle to the Connection (used in the message receive callback)
+ connLock sync.RWMutex // lock for the global connection
+)
+
+// init initializes global logger, which logs debug level messages to stdout.
+func init() {
+ log = logger.New()
+ log.Out = os.Stdout
+ log.Level = logger.DebugLevel
+}
+
+// SetLogger sets global logger to provided one.
+func SetLogger(l *logger.Logger) {
+ log = l
+}
+
+// Connect connects to VPP using specified VPP adapter and returns the connection handle.
+func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
+ connLock.Lock()
+ defer connLock.Unlock()
+
+ if conn != nil {
+ return nil, errors.New("only one connection per process is supported")
+ }
+
+ conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
+ conn.channels = make(map[uint32]*api.Channel)
+ conn.msgIDs = make(map[string]uint16)
+ conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
+
+ conn.vpp.SetMsgCallback(msgCallback)
+
+ logger.Debug("Connecting to VPP...")
+
+ err := conn.vpp.Connect()
+ if err != nil {
+ return nil, err
+ }
+
+ // store control ping IDs
+ conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
+ conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
+
+ logger.Debug("VPP connected.")
+
+ return conn, nil
+}
+
+// Disconnect disconnects from VPP.
+func (c *Connection) Disconnect() {
+ connLock.Lock()
+ defer connLock.Unlock()
+
+ if c != nil && c.vpp != nil {
+ c.vpp.Disconnect()
+ }
+ conn = nil
+}
+
+// NewAPIChannel returns a new API channel for communication with VPP via govpp core.
+// It uses default buffer sizes for the request and reply Go channels.
+func (c *Connection) NewAPIChannel() (*api.Channel, error) {
+ return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
+}
+
+// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
+// It allows to specify custom buffer sizes for the request and reply Go channels.
+func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
+ chID := atomic.AddUint32(&c.maxChannelID, 1)
+ chMeta := &channelMetadata{id: chID}
+
+ ch := api.NewChannelInternal(chMeta)
+ ch.MsgDecoder = c.codec
+ ch.MsgIdentifier = c
+
+ // create the communication channels
+ ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
+ ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
+ ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
+ ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
+
+ // store API channel within the client
+ c.channelsLock.Lock()
+ c.channels[chID] = ch
+ c.channelsLock.Unlock()
+
+ // start watching on the request channel
+ go c.watchRequests(ch, chMeta)
+
+ return ch, nil
+}
+
+// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
+func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
+ for {
+ select {
+ case req, ok := <-ch.ReqChan:
+ // new request on the request channel
+ if !ok {
+ // after closing the request channel, release API channel and return
+ c.releaseAPIChannel(ch, chMeta)
+ return
+ }
+ c.processRequest(ch, chMeta, req)
+
+ case req := <-ch.NotifSubsChan:
+ // new request on the notification subscribe channel
+ c.processNotifSubscribeRequest(ch, req)
+ }
+ }
+}
+
+// processRequest processes a single request received on the request channel.
+func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
+ // retrieve message ID
+ msgID, err := c.GetMessageID(req.Message)
+ if err != nil {
+ error := fmt.Errorf("unable to retrieve message ID: %v", err)
+ log.WithFields(logger.Fields{
+ "msg_name": req.Message.GetMessageName(),
+ "msg_crc": req.Message.GetCrcString(),
+ }).Errorf("unable to retrieve message ID: %v", err)
+ sendReply(ch, &api.VppReply{Error: error})
+ return error
+ }
+
+ // encode the message into binary
+ data, err := c.codec.EncodeMsg(req.Message, msgID)
+ if err != nil {
+ error := fmt.Errorf("unable to encode the messge: %v", err)
+ log.WithFields(logger.Fields{
+ "context": chMeta.id,
+ "msg_id": msgID,
+ }).Errorf("%v", error)
+ sendReply(ch, &api.VppReply{Error: error})
+ return error
+ }
+
+ // send the message
+ log.WithFields(logger.Fields{
+ "context": chMeta.id,
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Debug("Sending a message to VPP.")
+
+ c.vpp.SendMsg(chMeta.id, data)
+
+ if req.Multipart {
+ // multipart request
+ atomic.StoreUint32(&chMeta.multipart, 1)
+
+ // send a control ping
+ ping := &vpe.ControlPing{}
+ pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
+
+ log.WithFields(logger.Fields{
+ "context": chMeta.id,
+ "msg_id": c.pingReqID,
+ "msg_size": len(pingData),
+ }).Debug("Sending a control ping to VPP.")
+
+ c.vpp.SendMsg(chMeta.id, pingData)
+ }
+
+ return nil
+}
+
+// releaseAPIChannel releases API channel that needs to be closed.
+func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
+ log.WithFields(logger.Fields{
+ "context": chMeta.id,
+ }).Debug("API channel closed.")
+
+ // delete the channel from channels map
+ c.channelsLock.Lock()
+ delete(c.channels, chMeta.id)
+ c.channelsLock.Unlock()
+}
+
+// msgCallback is called whenever any binary API message comes from VPP.
+func msgCallback(context uint32, msgID uint16, data []byte) {
+ connLock.RLock()
+ defer connLock.RUnlock()
+
+ if conn == nil {
+ log.Warn("Already disconnected, ignoring the message.")
+ return
+ }
+
+ log.WithFields(logger.Fields{
+ "context": context,
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Debug("Received a message from VPP.")
+
+ if context == 0 || conn.isNotificationMessage(msgID) {
+ // process the message as a notification
+ conn.sendNotifications(msgID, data)
+ return
+ }
+
+ // match ch according to the context
+ conn.channelsLock.RLock()
+ ch, ok := conn.channels[context]
+ conn.channelsLock.RUnlock()
+
+ if !ok {
+ log.WithFields(logger.Fields{
+ "context": context,
+ "msg_id": msgID,
+ }).Error("Context ID not known, ignoring the message.")
+ return
+ }
+
+ chMeta := ch.Metadata().(*channelMetadata)
+ lastReplyReceived := false
+ // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
+ if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
+ lastReplyReceived = true
+ }
+
+ // send the data to the channel
+ sendReply(ch, &api.VppReply{
+ MessageID: msgID,
+ Data: data,
+ LastReplyReceived: lastReplyReceived,
+ })
+}
+
+// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
+// it logs the error and do not send the message.
+func sendReply(ch *api.Channel, reply *api.VppReply) {
+ select {
+ case ch.ReplyChan <- reply:
+ // reply sent successfully
+ default:
+ // unable to write into the channel without blocking
+ log.WithFields(logger.Fields{
+ "channel": ch,
+ "msg_id": reply.MessageID,
+ }).Warn("Unable to send the reply, reciever end not ready.")
+ }
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+ return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
+}
+
+// messageNameToID returns message ID of a message identified by its name and CRC.
+func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
+ // try to get the ID from the map
+ c.msgIDsLock.RLock()
+ id, ok := c.msgIDs[msgName+msgCrc]
+ c.msgIDsLock.RUnlock()
+ if ok {
+ return id, nil
+ }
+
+ // get the ID using VPP API
+ id, err := c.vpp.GetMsgID(msgName, msgCrc)
+ if err != nil {
+ error := fmt.Errorf("unable to retrieve message ID: %v", err)
+ log.WithFields(logger.Fields{
+ "msg_name": msgName,
+ "msg_crc": msgCrc,
+ }).Errorf("unable to retrieve message ID: %v", err)
+ return id, error
+ }
+
+ c.msgIDsLock.Lock()
+ c.msgIDs[msgName+msgCrc] = id
+ c.msgIDsLock.Unlock()
+
+ return id, nil
+}