aboutsummaryrefslogtreecommitdiffstats
path: root/core/core.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/core.go')
-rw-r--r--core/core.go365
1 files changed, 164 insertions, 201 deletions
diff --git a/core/core.go b/core/core.go
index 5b14a42..2484c81 100644
--- a/core/core.go
+++ b/core/core.go
@@ -14,14 +14,12 @@
package core
-//go:generate binapi-generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
-
import (
"errors"
- "fmt"
"os"
"sync"
"sync/atomic"
+ "time"
logger "github.com/Sirupsen/logrus"
@@ -31,14 +29,41 @@ import (
)
const (
- requestChannelBufSize = 100 // default size of the request channel buffers
- replyChannelBufSize = 100 // default size of the reply channel buffers
+ requestChannelBufSize = 100 // default size of the request channel buffers
+ replyChannelBufSize = 100 // default size of the reply channel buffers
+ notificationChannelBufSize = 100 // default size of the notification channel buffers
+)
+
+const (
+ healthCheckProbeInterval = time.Second * 1 // default health check probe interval
+ healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
+)
+
+// ConnectionState holds the current state of the connection to VPP.
+type ConnectionState int
+
+const (
+ // Connected connection state means that the connection to VPP has been successfully established.
+ Connected ConnectionState = iota
+
+ // Disconnected connection state means that the connection to VPP has been lost.
+ Disconnected = iota
)
+// ConnectionEvent is a notification about change in the VPP connection state.
+type ConnectionEvent struct {
+ // Timestamp holds the time when the event has been generated.
+ Timestamp time.Time
+
+ // State holds the new state of the connection to VPP at the time when the event has been generated.
+ State ConnectionState
+}
+
// Connection represents a shared memory connection to VPP via vppAdapter.
type Connection struct {
- vpp adapter.VppAdapter // VPP adapter
- codec *MsgCodec // message codec
+ vpp adapter.VppAdapter // VPP adapter
+ connected uint32 // non-zero if the adapter is connected to VPP
+ codec *MsgCodec // message codec
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
msgIDsLock sync.RWMutex // lock for the message IDs map
@@ -79,7 +104,57 @@ func SetLogger(l *logger.Logger) {
}
// Connect connects to VPP using specified VPP adapter and returns the connection handle.
+// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
+ // create new connection handle
+ c, err := newConnection(vppAdapter)
+ if err != nil {
+ return nil, err
+ }
+
+ // blocking attempt to connect to VPP
+ err = c.connectVPP()
+ if err != nil {
+ return nil, err
+ }
+
+ return conn, nil
+}
+
+// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
+// and ConnectionState channel. This call does not block until connection is established, it
+// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
+// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
+func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
+ // create new connection handle
+ c, err := newConnection(vppAdapter)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // asynchronously attempt to connect to VPP
+ connChan := make(chan ConnectionEvent, notificationChannelBufSize)
+ go c.connectLoop(connChan)
+
+ return conn, connChan, nil
+}
+
+// Disconnect disconnects from VPP and releases all connection-related resources.
+func (c *Connection) Disconnect() {
+ if c == nil {
+ return
+ }
+ connLock.Lock()
+ defer connLock.Unlock()
+
+ if c != nil && c.vpp != nil {
+ c.disconnectVPP()
+ }
+ conn = nil
+}
+
+// newConnection returns new connection handle.
+func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
connLock.Lock()
defer connLock.Unlock()
@@ -93,35 +168,102 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
conn.vpp.SetMsgCallback(msgCallback)
+ return conn, nil
+}
- logger.Debug("Connecting to VPP...")
+// connectVPP performs one blocking attempt to connect to VPP.
+func (c *Connection) connectVPP() error {
+ log.Debug("Connecting to VPP...")
- err := conn.vpp.Connect()
+ // blocking connect
+ err := c.vpp.Connect()
if err != nil {
- return nil, err
+ log.Warn(err)
+ return err
}
+ // store connected state
+ atomic.StoreUint32(&c.connected, 1)
+
// store control ping IDs
- conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
- conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
+ c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
+ c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
- logger.Debug("VPP connected.")
+ log.Info("Connected to VPP.")
+ return nil
+}
- return conn, nil
+// disconnectVPP disconnects from VPP in case it is connected.
+func (c *Connection) disconnectVPP() {
+ if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
+ c.vpp.Disconnect()
+ }
}
-// Disconnect disconnects from VPP.
-func (c *Connection) Disconnect() {
- if c == nil {
+// connectLoop attempts to connect to VPP until it succeeds.
+// Then it continues with healthCheckLoop.
+func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
+ // loop until connected
+ for {
+ err := c.connectVPP()
+ if err == nil {
+ // signal connected event
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
+ break
+ }
+ }
+
+ // we are now connected, continue with health check loop
+ c.healthCheckLoop(connChan)
+}
+
+// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
+// it continues with connectLoop and tries to reconnect.
+func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
+ // create a separate API channel for health check probes
+ ch, err := conn.NewAPIChannel()
+ if err != nil {
+ log.Error("Error by creating health check API channel, health check will be disabled:", err)
return
}
- connLock.Lock()
- defer connLock.Unlock()
- if c != nil && c.vpp != nil {
- c.vpp.Disconnect()
+ // send health check probes until an error occurs
+ for {
+ // wait for healthCheckProbeInterval
+ <-time.After(healthCheckProbeInterval)
+
+ if atomic.LoadUint32(&c.connected) == 0 {
+ // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
+ log.Debug("Disconnected on request, exiting health check loop.")
+ return
+ }
+
+ // send the control ping
+ ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
+
+ // expect response within timeout period
+ select {
+ case vppReply := <-ch.ReplyChan:
+ err = vppReply.Error
+ case <-time.After(healthCheckReplyTimeout):
+ err = errors.New("probe reply not received within the timeout period")
+ }
+
+ // in case of error, break & disconnect
+ if err != nil {
+ log.Errorf("VPP health check failed: %v", err)
+ // signal disconnected event via channel
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+ break
+ }
}
- conn = nil
+
+ // cleanup
+ ch.Close()
+ c.disconnectVPP()
+
+ // we are now disconnected, start connect loop
+ c.connectLoop(connChan)
}
// NewAPIChannel returns a new API channel for communication with VPP via govpp core.
@@ -163,84 +305,6 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
return ch, nil
}
-// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
- for {
- select {
- case req, ok := <-ch.ReqChan:
- // new request on the request channel
- if !ok {
- // after closing the request channel, release API channel and return
- c.releaseAPIChannel(ch, chMeta)
- return
- }
- c.processRequest(ch, chMeta, req)
-
- case req := <-ch.NotifSubsChan:
- // new request on the notification subscribe channel
- c.processNotifSubscribeRequest(ch, req)
- }
- }
-}
-
-// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
- // retrieve message ID
- msgID, err := c.GetMessageID(req.Message)
- if err != nil {
- error := fmt.Errorf("unable to retrieve message ID: %v", err)
- log.WithFields(logger.Fields{
- "msg_name": req.Message.GetMessageName(),
- "msg_crc": req.Message.GetCrcString(),
- }).Errorf("unable to retrieve message ID: %v", err)
- sendReply(ch, &api.VppReply{Error: error})
- return error
- }
-
- // encode the message into binary
- data, err := c.codec.EncodeMsg(req.Message, msgID)
- if err != nil {
- error := fmt.Errorf("unable to encode the messge: %v", err)
- log.WithFields(logger.Fields{
- "context": chMeta.id,
- "msg_id": msgID,
- }).Errorf("%v", error)
- sendReply(ch, &api.VppReply{Error: error})
- return error
- }
-
- // send the message
- log.WithFields(logger.Fields{
- "context": chMeta.id,
- "msg_id": msgID,
- "msg_size": len(data),
- }).Debug("Sending a message to VPP.")
-
- if req.Multipart {
- // expect multipart response
- atomic.StoreUint32(&chMeta.multipart, 1)
- }
-
- // send the request to VPP
- c.vpp.SendMsg(chMeta.id, data)
-
- if req.Multipart {
- // send a control ping to determine end of the multipart response
- ping := &vpe.ControlPing{}
- pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
-
- log.WithFields(logger.Fields{
- "context": chMeta.id,
- "msg_id": c.pingReqID,
- "msg_size": len(pingData),
- }).Debug("Sending a control ping to VPP.")
-
- c.vpp.SendMsg(chMeta.id, pingData)
- }
-
- return nil
-}
-
// releaseAPIChannel releases API channel that needs to be closed.
func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
log.WithFields(logger.Fields{
@@ -252,104 +316,3 @@ func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata)
delete(c.channels, chMeta.id)
c.channelsLock.Unlock()
}
-
-// msgCallback is called whenever any binary API message comes from VPP.
-func msgCallback(context uint32, msgID uint16, data []byte) {
- connLock.RLock()
- defer connLock.RUnlock()
-
- if conn == nil {
- log.Warn("Already disconnected, ignoring the message.")
- return
- }
-
- log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- "msg_size": len(data),
- }).Debug("Received a message from VPP.")
-
- if context == 0 || conn.isNotificationMessage(msgID) {
- // process the message as a notification
- conn.sendNotifications(msgID, data)
- return
- }
-
- // match ch according to the context
- conn.channelsLock.RLock()
- ch, ok := conn.channels[context]
- conn.channelsLock.RUnlock()
-
- if !ok {
- log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- }).Error("Context ID not known, ignoring the message.")
- return
- }
-
- chMeta := ch.Metadata().(*channelMetadata)
- lastReplyReceived := false
- // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
- if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
- lastReplyReceived = true
- }
-
- // send the data to the channel
- sendReply(ch, &api.VppReply{
- MessageID: msgID,
- Data: data,
- LastReplyReceived: lastReplyReceived,
- })
-}
-
-// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
-// it logs the error and do not send the message.
-func sendReply(ch *api.Channel, reply *api.VppReply) {
- select {
- case ch.ReplyChan <- reply:
- // reply sent successfully
- default:
- // unable to write into the channel without blocking
- log.WithFields(logger.Fields{
- "channel": ch,
- "msg_id": reply.MessageID,
- }).Warn("Unable to send the reply, reciever end not ready.")
- }
-}
-
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
- if c == nil {
- return 0, errors.New("nil connection passed in")
- }
- return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
-}
-
-// messageNameToID returns message ID of a message identified by its name and CRC.
-func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
- // try to get the ID from the map
- c.msgIDsLock.RLock()
- id, ok := c.msgIDs[msgName+msgCrc]
- c.msgIDsLock.RUnlock()
- if ok {
- return id, nil
- }
-
- // get the ID using VPP API
- id, err := c.vpp.GetMsgID(msgName, msgCrc)
- if err != nil {
- error := fmt.Errorf("unable to retrieve message ID: %v", err)
- log.WithFields(logger.Fields{
- "msg_name": msgName,
- "msg_crc": msgCrc,
- }).Errorf("unable to retrieve message ID: %v", err)
- return id, error
- }
-
- c.msgIDsLock.Lock()
- c.msgIDs[msgName+msgCrc] = id
- c.msgIDsLock.Unlock()
-
- return id, nil
-}