From 4459b648e9fb53c34abbf52a00e63ad384fb9ee2 Mon Sep 17 00:00:00 2001 From: Vladimir Lavor Date: Thu, 18 Feb 2021 16:05:30 +0100 Subject: Added asynchronous connection for stats socket The stats socket now allows an option to connect asynchronously in the same manner as the api socket connection. New method AsyncConnectStats returns a channel where notificaitons of type ConnectionEvent will be sent. Fixed the stats reconnect procedure which sometimes failed to re-eneable the connection. Change-Id: I0bdb19f0d57e3a1ea259b8b1ba0a5e5fa49a09db Signed-off-by: Vladimir Lavor --- core/stats.go | 115 +++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 93 insertions(+), 22 deletions(-) (limited to 'core') diff --git a/core/stats.go b/core/stats.go index f2da494..55c287e 100644 --- a/core/stats.go +++ b/core/stats.go @@ -3,7 +3,6 @@ package core import ( "path" "strings" - "sync/atomic" "time" "git.fd.io/govpp.git/adapter" @@ -11,8 +10,9 @@ import ( ) var ( - RetryUpdateCount = 10 - RetryUpdateDelay = time.Millisecond * 10 + RetryUpdateCount = 10 + RetryUpdateDelay = time.Millisecond * 10 + HealthCheckInterval = time.Second // default health check probe interval ) const ( @@ -76,8 +76,11 @@ const ( type StatsConnection struct { statsClient adapter.StatsAPI - // connected is true if the adapter is connected to VPP - connected uint32 + maxAttempts int // interval for reconnect attempts + recInterval time.Duration // maximum number of reconnect attempts + + connChan chan ConnectionEvent // connection event channel + done chan struct{} // to terminate stats connection watcher errorStatsData *adapter.StatDir nodeStatsData *adapter.StatDir @@ -87,9 +90,20 @@ type StatsConnection struct { memStatsData *adapter.StatDir } -func newStatsConnection(stats adapter.StatsAPI) *StatsConnection { +func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection { + if attempts == 0 { + attempts = DefaultMaxReconnectAttempts + } + if interval == 0 { + interval = DefaultReconnectInterval + } + return &StatsConnection{ statsClient: stats, + maxAttempts: attempts, + recInterval: interval, + connChan: make(chan ConnectionEvent, NotificationChanBufSize), + done: make(chan struct{}), } } @@ -97,28 +111,50 @@ func newStatsConnection(stats adapter.StatsAPI) *StatsConnection { // This call blocks until it is either connected, or an error occurs. // Only one connection attempt will be performed. func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) { - c := newStatsConnection(stats) + log.Debug("Connecting to stats..") + c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval) - if err := c.connectClient(); err != nil { + if err := c.statsClient.Connect(); err != nil { return nil, err } + log.Debugf("Connected to stats.") return c, nil } -func (c *StatsConnection) connectClient() error { - log.Debug("Connecting to stats..") +// AsyncConnectStats connects to the VPP stats socket asynchronously and returns the connection +// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent +// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the +// socket in case the session was disconnected. +func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) { + log.Debug("Connecting to stats asynchronously..") + c := newStatsConnection(stats, attempts, interval) - if err := c.statsClient.Connect(); err != nil { - return err - } + go c.connectLoop() - log.Debugf("Connected to stats.") + return c, c.connChan, nil +} - // store connected state - atomic.StoreUint32(&c.connected, 1) +func (c *StatsConnection) connectLoop() { + log.Debug("Asynchronously connecting to stats..") + var reconnectAttempts int - return nil + // loop until connected + for { + if err := c.statsClient.Connect(); err == nil { + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected}) + break + } else if reconnectAttempts < c.maxAttempts { + reconnectAttempts++ + log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) + time.Sleep(c.recInterval) + } else { + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}) + return + } + } + // start monitoring stats connection state + go c.monitorSocket() } // Disconnect disconnects from Stats API and releases all connection-related resources. @@ -127,14 +163,41 @@ func (c *StatsConnection) Disconnect() { return } if c.statsClient != nil { - c.disconnectClient() + if err := c.statsClient.Disconnect(); err != nil { + log.Debugf("disconnecting stats client failed: %v", err) + } } + close(c.connChan) + close(c.done) } -func (c *StatsConnection) disconnectClient() { - if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { - if err := c.statsClient.Disconnect(); err != nil { - log.Debugf("disconnecting stats client failed: %v", err) +func (c *StatsConnection) monitorSocket() { + var state, lastState ConnectionState + ticker := time.NewTicker(HealthCheckInterval) + + for { + select { + case <-ticker.C: + _, err := c.statsClient.ListStats(SystemStats_Heartbeat) + state = Connected + if err == adapter.ErrStatsDataBusy { + state = NotResponding + } + if err == adapter.ErrStatsDisconnected { + state = Disconnected + } + if err == adapter.ErrStatsAccessFailed { + state = Failed + } + if state == lastState { + continue + } + lastState = state + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err}) + case <-c.done: + log.Debugf("health check watcher closed") + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil}) + break } } } @@ -498,3 +561,11 @@ func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error) } return nil } + +func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) { + select { + case c.connChan <- event: + default: + log.Warn("Stats connection state channel is full, discarding value.") + } +} -- cgit 1.2.3-korg