aboutsummaryrefslogtreecommitdiffstats
path: root/core/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/connection.go')
-rw-r--r--core/connection.go289
1 files changed, 174 insertions, 115 deletions
diff --git a/core/connection.go b/core/connection.go
index a44d0c4..c77358f 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
-
package core
import (
"errors"
- "os"
+ "fmt"
+ "reflect"
"sync"
"sync/atomic"
"time"
@@ -28,115 +27,95 @@ import (
"git.fd.io/govpp.git/adapter"
"git.fd.io/govpp.git/api"
"git.fd.io/govpp.git/codec"
- "git.fd.io/govpp.git/core/bin_api/vpe"
-)
-
-var (
- msgControlPing api.Message = &vpe.ControlPing{}
- msgControlPingReply api.Message = &vpe.ControlPingReply{}
)
const (
- requestChannelBufSize = 100 // default size of the request channel buffers
- replyChannelBufSize = 100 // default size of the reply channel buffers
- notificationChannelBufSize = 100 // default size of the notification channel buffers
+ requestChannelBufSize = 100 // default size of the request channel buffer
+ replyChannelBufSize = 100 // default size of the reply channel buffer
+ notificationChannelBufSize = 100 // default size of the notification channel buffer
+
+ defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
)
var (
- healthCheckProbeInterval = time.Second * 1 // default health check probe interval
- healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
- healthCheckThreshold = 1 // number of failed healthProbe until the error is reported
+ healthCheckInterval = time.Second * 1 // default health check interval
+ healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check
+ healthCheckThreshold = 1 // number of failed health checks until the error is reported
)
-// ConnectionState holds the current state of the connection to VPP.
+// SetHealthCheckProbeInterval sets health check probe interval.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckProbeInterval(interval time.Duration) {
+ healthCheckInterval = interval
+}
+
+// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
+// If reply arrives after the timeout, check is considered as failed.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckReplyTimeout(timeout time.Duration) {
+ healthCheckReplyTimeout = timeout
+}
+
+// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckThreshold(threshold int) {
+ healthCheckThreshold = threshold
+}
+
+// ConnectionState represents the current state of the connection to VPP.
type ConnectionState int
const (
- // Connected connection state means that the connection to VPP has been successfully established.
+ // Connected represents state in which the connection has been successfully established.
Connected ConnectionState = iota
- // Disconnected connection state means that the connection to VPP has been lost.
+ // Disconnected represents state in which the connection has been dropped.
Disconnected
)
// ConnectionEvent is a notification about change in the VPP connection state.
type ConnectionEvent struct {
- // Timestamp holds the time when the event has been generated.
+ // Timestamp holds the time when the event has been created.
Timestamp time.Time
- // State holds the new state of the connection to VPP at the time when the event has been generated.
+ // State holds the new state of the connection at the time when the event has been created.
State ConnectionState
+
+ // Error holds error if any encountered.
+ Error error
}
+var (
+ connLock sync.RWMutex // lock for the global connection
+ conn *Connection // global handle to the Connection (used in the message receive callback)
+)
+
// Connection represents a shared memory connection to VPP via vppAdapter.
type Connection struct {
vpp adapter.VppAdapter // VPP adapter
connected uint32 // non-zero if the adapter is connected to VPP
- codec *codec.MsgCodec // message codec
- msgIDsLock sync.RWMutex // lock for the message IDs map
- msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
+ codec *codec.MsgCodec // message codec
+ msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
+ msgMap map[uint16]api.Message // map of messages indexed by message ID
+ maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
channelsLock sync.RWMutex // lock for the channels map
channels map[uint16]*channel // map of all API channels indexed by the channel ID
notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
- maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
- pingReqID uint16 // ID if the ControlPing message
- pingReplyID uint16 // ID of the ControlPingReply message
+ pingReqID uint16 // ID if the ControlPing message
+ pingReplyID uint16 // ID of the ControlPingReply message
lastReplyLock sync.Mutex // lock for the last reply
lastReply time.Time // time of the last received reply from VPP
}
-var (
- log *logger.Logger // global logger
- conn *Connection // global handle to the Connection (used in the message receive callback)
- connLock sync.RWMutex // lock for the global connection
-)
-
-// init initializes global logger, which logs debug level messages to stdout.
-func init() {
- log = logger.New()
- log.Out = os.Stdout
- log.Level = logger.DebugLevel
-}
-
-// SetLogger sets global logger to provided one.
-func SetLogger(l *logger.Logger) {
- log = l
-}
-
-// SetHealthCheckProbeInterval sets health check probe interval.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckProbeInterval(interval time.Duration) {
- healthCheckProbeInterval = interval
-}
-
-// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
-// If reply arrives after the timeout, check is considered as failed.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckReplyTimeout(timeout time.Duration) {
- healthCheckReplyTimeout = timeout
-}
-
-// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckThreshold(threshold int) {
- healthCheckThreshold = threshold
-}
-
-// SetControlPingMessages sets the messages for ControlPing and ControlPingReply
-func SetControlPingMessages(controPing, controlPingReply api.Message) {
- msgControlPing = controPing
- msgControlPingReply = controlPingReply
-}
-
// Connect connects to VPP using specified VPP adapter and returns the connection handle.
// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
@@ -152,7 +131,7 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
return nil, err
}
- return conn, nil
+ return c, nil
}
// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
@@ -170,7 +149,7 @@ func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEv
connChan := make(chan ConnectionEvent, notificationChannelBufSize)
go c.connectLoop(connChan)
- return conn, connChan, nil
+ return c, connChan, nil
}
// Disconnect disconnects from VPP and releases all connection-related resources.
@@ -178,10 +157,11 @@ func (c *Connection) Disconnect() {
if c == nil {
return
}
+
connLock.Lock()
defer connLock.Unlock()
- if c != nil && c.vpp != nil {
+ if c.vpp != nil {
c.disconnectVPP()
}
conn = nil
@@ -201,41 +181,119 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
codec: &codec.MsgCodec{},
channels: make(map[uint16]*channel),
msgIDs: make(map[string]uint16),
+ msgMap: make(map[uint16]api.Message),
notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
}
+ conn.vpp.SetMsgCallback(conn.msgCallback)
- conn.vpp.SetMsgCallback(msgCallback)
return conn, nil
}
-// connectVPP performs one blocking attempt to connect to VPP.
+// connectVPP performs blocking attempt to connect to VPP.
func (c *Connection) connectVPP() error {
- log.Debug("Connecting to VPP...")
+ log.Debug("Connecting to VPP..")
// blocking connect
- err := c.vpp.Connect()
- if err != nil {
- log.Warn(err)
+ if err := c.vpp.Connect(); err != nil {
return err
}
- // store control ping IDs
- if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
- c.vpp.Disconnect()
- return err
- }
- if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
+ log.Debugf("Connected to VPP.")
+
+ if err := c.retrieveMessageIDs(); err != nil {
c.vpp.Disconnect()
- return err
+ return fmt.Errorf("VPP is incompatible: %v", err)
}
// store connected state
atomic.StoreUint32(&c.connected, 1)
- log.Info("Connected to VPP.")
return nil
}
+func getMsgNameWithCrc(x api.Message) string {
+ return x.GetMessageName() + "_" + x.GetCrcString()
+}
+
+// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
+func (c *Connection) retrieveMessageIDs() (err error) {
+ t := time.Now()
+
+ var addMsg = func(msgID uint16, msg api.Message) {
+ c.msgIDs[getMsgNameWithCrc(msg)] = msgID
+ c.msgMap[msgID] = msg
+ }
+
+ msgs := api.GetAllMessages()
+
+ for name, msg := range msgs {
+ msgID, err := c.vpp.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
+ if err != nil {
+ return err
+ }
+
+ addMsg(msgID, msg)
+
+ if msg.GetMessageName() == msgControlPing.GetMessageName() {
+ c.pingReqID = msgID
+ msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() {
+ c.pingReplyID = msgID
+ msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ }
+
+ if debugMsgIDs {
+ log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
+ }
+ }
+
+ log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t))
+
+ // fallback for control ping when vpe package is not imported
+ if c.pingReqID == 0 {
+ c.pingReqID, err = c.vpp.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
+ if err != nil {
+ return err
+ }
+ addMsg(c.pingReqID, msgControlPing)
+ }
+ if c.pingReplyID == 0 {
+ c.pingReplyID, err = c.vpp.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
+ if err != nil {
+ return err
+ }
+ addMsg(c.pingReplyID, msgControlPingReply)
+ }
+
+ return nil
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+ if c == nil {
+ return 0, errors.New("nil connection passed in")
+ }
+
+ if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
+ return msgID, nil
+ }
+
+ return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
+}
+
+// LookupByID looks up message name and crc by ID.
+func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
+
+ if msg, ok := c.msgMap[msgID]; ok {
+ return msg, nil
+ }
+
+ return nil, fmt.Errorf("unknown message ID: %d", msgID)
+}
+
// disconnectVPP disconnects from VPP in case it is connected.
func (c *Connection) disconnectVPP() {
if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
@@ -269,19 +327,21 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
// it continues with connectLoop and tries to reconnect.
func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
// create a separate API channel for health check probes
- ch, err := conn.newAPIChannelBuffered(1, 1)
+ ch, err := c.newAPIChannel(1, 1)
if err != nil {
log.Error("Failed to create health check API channel, health check will be disabled:", err)
return
}
- var sinceLastReply time.Duration
- var failedChecks int
+ var (
+ sinceLastReply time.Duration
+ failedChecks int
+ )
// send health check probes until an error or timeout occurs
for {
// sleep until next health check probe period
- time.Sleep(healthCheckProbeInterval)
+ time.Sleep(healthCheckInterval)
if atomic.LoadUint32(&c.connected) == 0 {
// Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -297,22 +357,22 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
}
// send the control ping request
- ch.reqChan <- &api.VppRequest{Message: msgControlPing}
+ ch.reqChan <- &vppRequest{msg: msgControlPing}
for {
// expect response within timeout period
select {
case vppReply := <-ch.replyChan:
- err = vppReply.Error
+ err = vppReply.err
case <-time.After(healthCheckReplyTimeout):
err = ErrProbeTimeout
// check if time since last reply from any other
// channel is less than health check reply timeout
- conn.lastReplyLock.Lock()
+ c.lastReplyLock.Lock()
sinceLastReply = time.Since(c.lastReply)
- conn.lastReplyLock.Unlock()
+ c.lastReplyLock.Unlock()
if sinceLastReply < healthCheckReplyTimeout {
log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
@@ -326,17 +386,18 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
failedChecks++
log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
if failedChecks > healthCheckThreshold {
- // in case of exceeded treshold disconnect
+ // in case of exceeded failed check treshold, assume VPP disconnected
log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
break
}
} else if err != nil {
- // in case of error disconnect
+ // in case of error, assume VPP disconnected
log.Errorf("VPP health check probe failed: %v", err)
- connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
break
} else if failedChecks > 0 {
+ // in case of success after failed checks, clear failed check counter
failedChecks = 0
log.Infof("VPP health check probe OK")
}
@@ -351,33 +412,31 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
}
func (c *Connection) NewAPIChannel() (api.Channel, error) {
- return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
+ return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize)
}
func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
- return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize)
+ return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
}
// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
// It allows to specify custom buffer sizes for the request and reply Go channels.
-func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) {
+func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) {
if c == nil {
return nil, errors.New("nil connection passed in")
}
chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
ch := &channel{
- id: chID,
- replyTimeout: defaultReplyTimeout,
+ id: chID,
+ replyTimeout: defaultReplyTimeout,
+ msgDecoder: c.codec,
+ msgIdentifier: c,
+ reqChan: make(chan *vppRequest, reqChanBufSize),
+ replyChan: make(chan *vppReply, replyChanBufSize),
+ notifSubsChan: make(chan *subscriptionRequest, reqChanBufSize),
+ notifSubsReplyChan: make(chan error, replyChanBufSize),
}
- ch.msgDecoder = c.codec
- ch.msgIdentifier = c
-
- // create the communication channels
- ch.reqChan = make(chan *api.VppRequest, reqChanBufSize)
- ch.replyChan = make(chan *api.VppReply, replyChanBufSize)
- ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
- ch.notifSubsReplyChan = make(chan error, replyChanBufSize)
// store API channel within the client
c.channelsLock.Lock()
@@ -393,8 +452,8 @@ func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
// releaseAPIChannel releases API channel that needs to be closed.
func (c *Connection) releaseAPIChannel(ch *channel) {
log.WithFields(logger.Fields{
- "ID": ch.id,
- }).Debug("API channel closed.")
+ "channel": ch.id,
+ }).Debug("API channel released")
// delete the channel from channels map
c.channelsLock.Lock()