summaryrefslogtreecommitdiffstats
path: root/core/stats.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/stats.go')
-rw-r--r--core/stats.go115
1 files changed, 93 insertions, 22 deletions
diff --git a/core/stats.go b/core/stats.go
index f2da494..55c287e 100644
--- a/core/stats.go
+++ b/core/stats.go
@@ -3,7 +3,6 @@ package core
import (
"path"
"strings"
- "sync/atomic"
"time"
"git.fd.io/govpp.git/adapter"
@@ -11,8 +10,9 @@ import (
)
var (
- RetryUpdateCount = 10
- RetryUpdateDelay = time.Millisecond * 10
+ RetryUpdateCount = 10
+ RetryUpdateDelay = time.Millisecond * 10
+ HealthCheckInterval = time.Second // default health check probe interval
)
const (
@@ -76,8 +76,11 @@ const (
type StatsConnection struct {
statsClient adapter.StatsAPI
- // connected is true if the adapter is connected to VPP
- connected uint32
+ maxAttempts int // interval for reconnect attempts
+ recInterval time.Duration // maximum number of reconnect attempts
+
+ connChan chan ConnectionEvent // connection event channel
+ done chan struct{} // to terminate stats connection watcher
errorStatsData *adapter.StatDir
nodeStatsData *adapter.StatDir
@@ -87,9 +90,20 @@ type StatsConnection struct {
memStatsData *adapter.StatDir
}
-func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
+func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection {
+ if attempts == 0 {
+ attempts = DefaultMaxReconnectAttempts
+ }
+ if interval == 0 {
+ interval = DefaultReconnectInterval
+ }
+
return &StatsConnection{
statsClient: stats,
+ maxAttempts: attempts,
+ recInterval: interval,
+ connChan: make(chan ConnectionEvent, NotificationChanBufSize),
+ done: make(chan struct{}),
}
}
@@ -97,28 +111,50 @@ func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
// This call blocks until it is either connected, or an error occurs.
// Only one connection attempt will be performed.
func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) {
- c := newStatsConnection(stats)
+ log.Debug("Connecting to stats..")
+ c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
- if err := c.connectClient(); err != nil {
+ if err := c.statsClient.Connect(); err != nil {
return nil, err
}
+ log.Debugf("Connected to stats.")
return c, nil
}
-func (c *StatsConnection) connectClient() error {
- log.Debug("Connecting to stats..")
+// AsyncConnectStats connects to the VPP stats socket asynchronously and returns the connection
+// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent
+// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the
+// socket in case the session was disconnected.
+func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) {
+ log.Debug("Connecting to stats asynchronously..")
+ c := newStatsConnection(stats, attempts, interval)
- if err := c.statsClient.Connect(); err != nil {
- return err
- }
+ go c.connectLoop()
- log.Debugf("Connected to stats.")
+ return c, c.connChan, nil
+}
- // store connected state
- atomic.StoreUint32(&c.connected, 1)
+func (c *StatsConnection) connectLoop() {
+ log.Debug("Asynchronously connecting to stats..")
+ var reconnectAttempts int
- return nil
+ // loop until connected
+ for {
+ if err := c.statsClient.Connect(); err == nil {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
+ break
+ } else if reconnectAttempts < c.maxAttempts {
+ reconnectAttempts++
+ log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ time.Sleep(c.recInterval)
+ } else {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
+ return
+ }
+ }
+ // start monitoring stats connection state
+ go c.monitorSocket()
}
// Disconnect disconnects from Stats API and releases all connection-related resources.
@@ -127,14 +163,41 @@ func (c *StatsConnection) Disconnect() {
return
}
if c.statsClient != nil {
- c.disconnectClient()
+ if err := c.statsClient.Disconnect(); err != nil {
+ log.Debugf("disconnecting stats client failed: %v", err)
+ }
}
+ close(c.connChan)
+ close(c.done)
}
-func (c *StatsConnection) disconnectClient() {
- if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
- if err := c.statsClient.Disconnect(); err != nil {
- log.Debugf("disconnecting stats client failed: %v", err)
+func (c *StatsConnection) monitorSocket() {
+ var state, lastState ConnectionState
+ ticker := time.NewTicker(HealthCheckInterval)
+
+ for {
+ select {
+ case <-ticker.C:
+ _, err := c.statsClient.ListStats(SystemStats_Heartbeat)
+ state = Connected
+ if err == adapter.ErrStatsDataBusy {
+ state = NotResponding
+ }
+ if err == adapter.ErrStatsDisconnected {
+ state = Disconnected
+ }
+ if err == adapter.ErrStatsAccessFailed {
+ state = Failed
+ }
+ if state == lastState {
+ continue
+ }
+ lastState = state
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err})
+ case <-c.done:
+ log.Debugf("health check watcher closed")
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil})
+ break
}
}
}
@@ -498,3 +561,11 @@ func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error)
}
return nil
}
+
+func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) {
+ select {
+ case c.connChan <- event:
+ default:
+ log.Warn("Stats connection state channel is full, discarding value.")
+ }
+}