aboutsummaryrefslogtreecommitdiffstats
path: root/core/connection.go
diff options
context:
space:
mode:
authorOndrej Fabry <ofabry@cisco.com>2018-08-23 22:51:56 +0200
committerOndrej Fabry <ofabry@cisco.com>2018-08-24 12:43:05 +0200
commit6b350c65fe0ec845cecf58bfb41ffc63dc9c04f7 (patch)
tree6255495854f43ec2f2d11f88990369aadb48db3f /core/connection.go
parent892683bef86cacc2ccda2b4df2b079171bd92164 (diff)
Simplify subscribing to events and fix events
- there is no need for sending subscription requests through channels, since all the messages are registered and no communication with VPP is needed Change-Id: Ibc29957be02a32e26309f66c369a071559b822a9 Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
Diffstat (limited to 'core/connection.go')
-rw-r--r--core/connection.go235
1 files changed, 104 insertions, 131 deletions
diff --git a/core/connection.go b/core/connection.go
index c77358f..7d014ce 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -29,42 +29,19 @@ import (
"git.fd.io/govpp.git/codec"
)
-const (
- requestChannelBufSize = 100 // default size of the request channel buffer
- replyChannelBufSize = 100 // default size of the reply channel buffer
- notificationChannelBufSize = 100 // default size of the notification channel buffer
-
- defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
+var (
+ RequestChanBufSize = 100 // default size of the request channel buffer
+ ReplyChanBufSize = 100 // default size of the reply channel buffer
+ NotificationChanBufSize = 100 // default size of the notification channel buffer
)
var (
- healthCheckInterval = time.Second * 1 // default health check interval
- healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check
- healthCheckThreshold = 1 // number of failed health checks until the error is reported
+ HealthCheckProbeInterval = time.Second * 1 // default health check probe interval
+ HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
+ HealthCheckThreshold = 1 // number of failed health checks until the error is reported
+ DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP
)
-// SetHealthCheckProbeInterval sets health check probe interval.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckProbeInterval(interval time.Duration) {
- healthCheckInterval = interval
-}
-
-// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
-// If reply arrives after the timeout, check is considered as failed.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckReplyTimeout(timeout time.Duration) {
- healthCheckReplyTimeout = timeout
-}
-
-// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckThreshold(threshold int) {
- healthCheckThreshold = threshold
-}
-
// ConnectionState represents the current state of the connection to VPP.
type ConnectionState int
@@ -104,10 +81,10 @@ type Connection struct {
maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
channelsLock sync.RWMutex // lock for the channels map
- channels map[uint16]*channel // map of all API channels indexed by the channel ID
+ channels map[uint16]*Channel // map of all API channels indexed by the channel ID
- notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
- notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
+ subscriptionsLock sync.RWMutex // lock for the subscriptions map
+ subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
pingReqID uint16 // ID if the ControlPing message
pingReplyID uint16 // ID of the ControlPingReply message
@@ -116,18 +93,30 @@ type Connection struct {
lastReply time.Time // time of the last received reply from VPP
}
+func newConnection(vpp adapter.VppAdapter) *Connection {
+ c := &Connection{
+ vpp: vpp,
+ codec: &codec.MsgCodec{},
+ msgIDs: make(map[string]uint16),
+ msgMap: make(map[uint16]api.Message),
+ channels: make(map[uint16]*Channel),
+ subscriptions: make(map[uint16][]*subscriptionCtx),
+ }
+ vpp.SetMsgCallback(c.msgCallback)
+ return c
+}
+
// Connect connects to VPP using specified VPP adapter and returns the connection handle.
// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
// create new connection handle
- c, err := newConnection(vppAdapter)
+ c, err := createConnection(vppAdapter)
if err != nil {
return nil, err
}
// blocking attempt to connect to VPP
- err = c.connectVPP()
- if err != nil {
+ if err := c.connectVPP(); err != nil {
return nil, err
}
@@ -140,13 +129,13 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
// create new connection handle
- c, err := newConnection(vppAdapter)
+ c, err := createConnection(vppAdapter)
if err != nil {
return nil, nil, err
}
// asynchronously attempt to connect to VPP
- connChan := make(chan ConnectionEvent, notificationChannelBufSize)
+ connChan := make(chan ConnectionEvent, NotificationChanBufSize)
go c.connectLoop(connChan)
return c, connChan, nil
@@ -168,7 +157,7 @@ func (c *Connection) Disconnect() {
}
// newConnection returns new connection handle.
-func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
+func createConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
connLock.Lock()
defer connLock.Unlock()
@@ -176,15 +165,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
return nil, errors.New("only one connection per process is supported")
}
- conn = &Connection{
- vpp: vppAdapter,
- codec: &codec.MsgCodec{},
- channels: make(map[uint16]*channel),
- msgIDs: make(map[string]uint16),
- msgMap: make(map[uint16]api.Message),
- notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
- }
- conn.vpp.SetMsgCallback(conn.msgCallback)
+ conn = newConnection(vppAdapter)
return conn, nil
}
@@ -211,8 +192,72 @@ func (c *Connection) connectVPP() error {
return nil
}
-func getMsgNameWithCrc(x api.Message) string {
- return x.GetMessageName() + "_" + x.GetCrcString()
+func (c *Connection) NewAPIChannel() (api.Channel, error) {
+ return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
+}
+
+func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
+ return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
+}
+
+// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
+// It allows to specify custom buffer sizes for the request and reply Go channels.
+func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
+
+ // create new channel
+ chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+ channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
+
+ // store API channel within the client
+ c.channelsLock.Lock()
+ c.channels[chID] = channel
+ c.channelsLock.Unlock()
+
+ // start watching on the request channel
+ go c.watchRequests(channel)
+
+ return channel, nil
+}
+
+// releaseAPIChannel releases API channel that needs to be closed.
+func (c *Connection) releaseAPIChannel(ch *Channel) {
+ log.WithFields(logger.Fields{
+ "channel": ch.id,
+ }).Debug("API channel released")
+
+ // delete the channel from channels map
+ c.channelsLock.Lock()
+ delete(c.channels, ch.id)
+ c.channelsLock.Unlock()
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+ if c == nil {
+ return 0, errors.New("nil connection passed in")
+ }
+
+ if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
+ return msgID, nil
+ }
+
+ return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
+}
+
+// LookupByID looks up message name and crc by ID.
+func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
+
+ if msg, ok := c.msgMap[msgID]; ok {
+ return msg, nil
+ }
+
+ return nil, fmt.Errorf("unknown message ID: %d", msgID)
}
// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
@@ -268,32 +313,6 @@ func (c *Connection) retrieveMessageIDs() (err error) {
return nil
}
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
- if c == nil {
- return 0, errors.New("nil connection passed in")
- }
-
- if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
- return msgID, nil
- }
-
- return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
-}
-
-// LookupByID looks up message name and crc by ID.
-func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
- if c == nil {
- return nil, errors.New("nil connection passed in")
- }
-
- if msg, ok := c.msgMap[msgID]; ok {
- return msg, nil
- }
-
- return nil, fmt.Errorf("unknown message ID: %d", msgID)
-}
-
// disconnectVPP disconnects from VPP in case it is connected.
func (c *Connection) disconnectVPP() {
if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
@@ -341,7 +360,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
// send health check probes until an error or timeout occurs
for {
// sleep until next health check probe period
- time.Sleep(healthCheckInterval)
+ time.Sleep(HealthCheckProbeInterval)
if atomic.LoadUint32(&c.connected) == 0 {
// Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -365,7 +384,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
case vppReply := <-ch.replyChan:
err = vppReply.err
- case <-time.After(healthCheckReplyTimeout):
+ case <-time.After(HealthCheckReplyTimeout):
err = ErrProbeTimeout
// check if time since last reply from any other
@@ -374,7 +393,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
sinceLastReply = time.Since(c.lastReply)
c.lastReplyLock.Unlock()
- if sinceLastReply < healthCheckReplyTimeout {
+ if sinceLastReply < HealthCheckReplyTimeout {
log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
continue
}
@@ -384,10 +403,10 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
if err == ErrProbeTimeout {
failedChecks++
- log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
- if failedChecks > healthCheckThreshold {
+ log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
+ if failedChecks > HealthCheckThreshold {
// in case of exceeded failed check treshold, assume VPP disconnected
- log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+ log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
break
}
@@ -411,52 +430,6 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
c.connectLoop(connChan)
}
-func (c *Connection) NewAPIChannel() (api.Channel, error) {
- return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize)
-}
-
-func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
- return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
-}
-
-// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
-// It allows to specify custom buffer sizes for the request and reply Go channels.
-func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) {
- if c == nil {
- return nil, errors.New("nil connection passed in")
- }
-
- chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
- ch := &channel{
- id: chID,
- replyTimeout: defaultReplyTimeout,
- msgDecoder: c.codec,
- msgIdentifier: c,
- reqChan: make(chan *vppRequest, reqChanBufSize),
- replyChan: make(chan *vppReply, replyChanBufSize),
- notifSubsChan: make(chan *subscriptionRequest, reqChanBufSize),
- notifSubsReplyChan: make(chan error, replyChanBufSize),
- }
-
- // store API channel within the client
- c.channelsLock.Lock()
- c.channels[chID] = ch
- c.channelsLock.Unlock()
-
- // start watching on the request channel
- go c.watchRequests(ch)
-
- return ch, nil
-}
-
-// releaseAPIChannel releases API channel that needs to be closed.
-func (c *Connection) releaseAPIChannel(ch *channel) {
- log.WithFields(logger.Fields{
- "channel": ch.id,
- }).Debug("API channel released")
-
- // delete the channel from channels map
- c.channelsLock.Lock()
- delete(c.channels, ch.id)
- c.channelsLock.Unlock()
+func getMsgNameWithCrc(x api.Message) string {
+ return x.GetMessageName() + "_" + x.GetCrcString()
}