aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorVladimir Lavor <vlavor@cisco.com>2021-02-18 16:05:30 +0100
committerVladimir Lavor <vlavor@cisco.com>2021-02-23 13:28:47 +0100
commit4459b648e9fb53c34abbf52a00e63ad384fb9ee2 (patch)
treefbe5885b5aa2e4d3ff10255327d22ddd56a86c92 /core
parent4c1cccf48cd144414c7233f167087aff770ef67b (diff)
Added asynchronous connection for stats socket
The stats socket now allows an option to connect asynchronously in the same manner as the api socket connection. New method AsyncConnectStats returns a channel where notificaitons of type ConnectionEvent will be sent. Fixed the stats reconnect procedure which sometimes failed to re-eneable the connection. Change-Id: I0bdb19f0d57e3a1ea259b8b1ba0a5e5fa49a09db Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
Diffstat (limited to 'core')
-rw-r--r--core/stats.go115
1 files changed, 93 insertions, 22 deletions
diff --git a/core/stats.go b/core/stats.go
index f2da494..55c287e 100644
--- a/core/stats.go
+++ b/core/stats.go
@@ -3,7 +3,6 @@ package core
import (
"path"
"strings"
- "sync/atomic"
"time"
"git.fd.io/govpp.git/adapter"
@@ -11,8 +10,9 @@ import (
)
var (
- RetryUpdateCount = 10
- RetryUpdateDelay = time.Millisecond * 10
+ RetryUpdateCount = 10
+ RetryUpdateDelay = time.Millisecond * 10
+ HealthCheckInterval = time.Second // default health check probe interval
)
const (
@@ -76,8 +76,11 @@ const (
type StatsConnection struct {
statsClient adapter.StatsAPI
- // connected is true if the adapter is connected to VPP
- connected uint32
+ maxAttempts int // interval for reconnect attempts
+ recInterval time.Duration // maximum number of reconnect attempts
+
+ connChan chan ConnectionEvent // connection event channel
+ done chan struct{} // to terminate stats connection watcher
errorStatsData *adapter.StatDir
nodeStatsData *adapter.StatDir
@@ -87,9 +90,20 @@ type StatsConnection struct {
memStatsData *adapter.StatDir
}
-func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
+func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection {
+ if attempts == 0 {
+ attempts = DefaultMaxReconnectAttempts
+ }
+ if interval == 0 {
+ interval = DefaultReconnectInterval
+ }
+
return &StatsConnection{
statsClient: stats,
+ maxAttempts: attempts,
+ recInterval: interval,
+ connChan: make(chan ConnectionEvent, NotificationChanBufSize),
+ done: make(chan struct{}),
}
}
@@ -97,28 +111,50 @@ func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
// This call blocks until it is either connected, or an error occurs.
// Only one connection attempt will be performed.
func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) {
- c := newStatsConnection(stats)
+ log.Debug("Connecting to stats..")
+ c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
- if err := c.connectClient(); err != nil {
+ if err := c.statsClient.Connect(); err != nil {
return nil, err
}
+ log.Debugf("Connected to stats.")
return c, nil
}
-func (c *StatsConnection) connectClient() error {
- log.Debug("Connecting to stats..")
+// AsyncConnectStats connects to the VPP stats socket asynchronously and returns the connection
+// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent
+// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the
+// socket in case the session was disconnected.
+func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) {
+ log.Debug("Connecting to stats asynchronously..")
+ c := newStatsConnection(stats, attempts, interval)
- if err := c.statsClient.Connect(); err != nil {
- return err
- }
+ go c.connectLoop()
- log.Debugf("Connected to stats.")
+ return c, c.connChan, nil
+}
- // store connected state
- atomic.StoreUint32(&c.connected, 1)
+func (c *StatsConnection) connectLoop() {
+ log.Debug("Asynchronously connecting to stats..")
+ var reconnectAttempts int
- return nil
+ // loop until connected
+ for {
+ if err := c.statsClient.Connect(); err == nil {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
+ break
+ } else if reconnectAttempts < c.maxAttempts {
+ reconnectAttempts++
+ log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ time.Sleep(c.recInterval)
+ } else {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
+ return
+ }
+ }
+ // start monitoring stats connection state
+ go c.monitorSocket()
}
// Disconnect disconnects from Stats API and releases all connection-related resources.
@@ -127,14 +163,41 @@ func (c *StatsConnection) Disconnect() {
return
}
if c.statsClient != nil {
- c.disconnectClient()
+ if err := c.statsClient.Disconnect(); err != nil {
+ log.Debugf("disconnecting stats client failed: %v", err)
+ }
}
+ close(c.connChan)
+ close(c.done)
}
-func (c *StatsConnection) disconnectClient() {
- if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
- if err := c.statsClient.Disconnect(); err != nil {
- log.Debugf("disconnecting stats client failed: %v", err)
+func (c *StatsConnection) monitorSocket() {
+ var state, lastState ConnectionState
+ ticker := time.NewTicker(HealthCheckInterval)
+
+ for {
+ select {
+ case <-ticker.C:
+ _, err := c.statsClient.ListStats(SystemStats_Heartbeat)
+ state = Connected
+ if err == adapter.ErrStatsDataBusy {
+ state = NotResponding
+ }
+ if err == adapter.ErrStatsDisconnected {
+ state = Disconnected
+ }
+ if err == adapter.ErrStatsAccessFailed {
+ state = Failed
+ }
+ if state == lastState {
+ continue
+ }
+ lastState = state
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err})
+ case <-c.done:
+ log.Debugf("health check watcher closed")
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil})
+ break
}
}
}
@@ -498,3 +561,11 @@ func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error)
}
return nil
}
+
+func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) {
+ select {
+ case c.connChan <- event:
+ default:
+ log.Warn("Stats connection state channel is full, discarding value.")
+ }
+}