From 4459b648e9fb53c34abbf52a00e63ad384fb9ee2 Mon Sep 17 00:00:00 2001 From: Vladimir Lavor Date: Thu, 18 Feb 2021 16:05:30 +0100 Subject: Added asynchronous connection for stats socket The stats socket now allows an option to connect asynchronously in the same manner as the api socket connection. New method AsyncConnectStats returns a channel where notificaitons of type ConnectionEvent will be sent. Fixed the stats reconnect procedure which sometimes failed to re-eneable the connection. Change-Id: I0bdb19f0d57e3a1ea259b8b1ba0a5e5fa49a09db Signed-off-by: Vladimir Lavor --- adapter/stats_api.go | 1 + adapter/statsclient/statsclient.go | 103 +++++++++++++++++++++++---------- core/stats.go | 115 ++++++++++++++++++++++++++++++------- examples/stats-client/stats_api.go | 33 +++++++++-- 4 files changed, 195 insertions(+), 57 deletions(-) diff --git a/adapter/stats_api.go b/adapter/stats_api.go index 90dbeb3..15c3789 100644 --- a/adapter/stats_api.go +++ b/adapter/stats_api.go @@ -27,6 +27,7 @@ const ( var ( ErrStatsDataBusy = errors.New("stats data busy") ErrStatsDirStale = errors.New("stats dir stale") + ErrStatsDisconnected = errors.New("stats disconnected") ErrStatsAccessFailed = errors.New("stats access failed") ) diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go index 9470275..e99d787 100644 --- a/adapter/statsclient/statsclient.go +++ b/adapter/statsclient/statsclient.go @@ -20,20 +20,28 @@ import ( "fmt" "net" "os" + "path/filepath" "regexp" + "sync/atomic" "syscall" "time" + "git.fd.io/govpp.git/adapter" "github.com/fsnotify/fsnotify" "github.com/ftrvxmtrx/fd" logger "github.com/sirupsen/logrus" - - "git.fd.io/govpp.git/adapter" ) const ( // DefaultSocketName is default VPP stats socket file path. DefaultSocketName = adapter.DefaultStatsSocket + + // SocketRetryPeriod is the time period after the socket availability + // will be re-checked + SocketRetryPeriod = 50 * time.Millisecond + + // SocketRetryTimeout is the maximum time for the stats socket + SocketRetryTimeout = 3 * time.Second ) var ( @@ -64,10 +72,12 @@ var _ adapter.StatsAPI = (*StatsClient)(nil) // StatsClient is the pure Go implementation for VPP stats API. type StatsClient struct { - socketPath string + socket string + + headerData []byte - headerData []byte - isConnected bool + // defines the adapter connection state + connected uint32 // to quit socket monitor done chan struct{} @@ -82,17 +92,14 @@ func NewStatsClient(socket string) *StatsClient { socket = DefaultSocketName } return &StatsClient{ - socketPath: socket, + socket: socket, } } // Connect to validated VPP stats socket and start monitoring // socket file changes func (sc *StatsClient) Connect() (err error) { - if sc.isConnected { - return fmt.Errorf("already connected") - } - if err := sc.checkSocketValid(); err != nil { + if err := sc.waitForSocket(); err != nil { return err } sc.done = make(chan struct{}) @@ -100,22 +107,29 @@ func (sc *StatsClient) Connect() (err error) { return err } sc.monitorSocket() - sc.isConnected = true return nil } // Disconnect from the socket, unmap shared memory and terminate // socket monitor func (sc *StatsClient) Disconnect() error { - if !sc.isConnected { - return nil // not connected + if sc.headerData == nil { + return nil } - sc.isConnected = false - close(sc.done) - return sc.disconnect() + if err := syscall.Munmap(sc.headerData); err != nil { + Log.Debugf("unmapping shared memory failed: %v", err) + return fmt.Errorf("unmapping shared memory failed: %v", err) + } + sc.headerData = nil + + Log.Debugf("successfully unmapped shared memory") + return nil } func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } accessEpoch := sc.accessStart() if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed @@ -149,6 +163,9 @@ func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) { } func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } accessEpoch := sc.accessStart() if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed @@ -192,6 +209,9 @@ func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntr } func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } dir := new(adapter.StatDir) accessEpoch := sc.accessStart() @@ -241,6 +261,9 @@ func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) // UpdateDir refreshes directory data for all counters func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { + if !sc.isConnected() { + return adapter.ErrStatsDisconnected + } epoch, _ := sc.GetEpoch() if dir.Epoch != epoch { return adapter.ErrStatsDirStale @@ -281,11 +304,25 @@ func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { return nil } -func (sc *StatsClient) checkSocketValid() error { - if _, err := os.Stat(sc.socketPath); os.IsNotExist(err) { - return fmt.Errorf("stats socket file %s does not exist", sc.socketPath) - } else if err != nil { - return fmt.Errorf("stats socket error: %v", err) +// checks the socket existence and waits for it for the designated +// time if it is not available immediately +func (sc *StatsClient) waitForSocket() error { + if _, err := os.Stat(sc.socket); err != nil { + if os.IsNotExist(err) { + ticker := time.NewTicker(SocketRetryPeriod) + for { + select { + case <-ticker.C: + if _, err := os.Stat(sc.socket); err == nil { + return nil + } + case <-time.After(SocketRetryTimeout): + return fmt.Errorf("stats socket file %s is not ready within timeout ", sc.socket) + } + } + } else { + return fmt.Errorf("stats socket error: %v", err) + } } return nil } @@ -295,7 +332,7 @@ func (sc *StatsClient) checkSocketValid() error { func (sc *StatsClient) connect() (ss statSegment, err error) { addr := net.UnixAddr{ Net: "unixpacket", - Name: sc.socketPath, + Name: sc.socket, } Log.Debugf("connecting to: %v", addr) @@ -350,6 +387,9 @@ func (sc *StatsClient) connect() (ss statSegment, err error) { version, minVersion, maxVersion) } + // set connected + atomic.CompareAndSwapUint32(&sc.connected, 0, 1) + return ss, nil } @@ -359,8 +399,8 @@ func (sc *StatsClient) reconnect() (err error) { if err = sc.disconnect(); err != nil { return fmt.Errorf("error disconnecting socket: %v", err) } - if err = sc.checkSocketValid(); err != nil { - return fmt.Errorf("error validating socket: %v", err) + if err = sc.waitForSocket(); err != nil { + return fmt.Errorf("error while waiting on socket: %v", err) } if sc.statSegment, err = sc.connect(); err != nil { return fmt.Errorf("error connecting socket: %v", err) @@ -370,6 +410,9 @@ func (sc *StatsClient) reconnect() (err error) { // disconnect unmaps socket data from the memory and resets the header func (sc *StatsClient) disconnect() error { + if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) { + return fmt.Errorf("stats client is already disconnected") + } if sc.headerData == nil { return nil } @@ -394,14 +437,10 @@ func (sc *StatsClient) monitorSocket() { for { select { case event := <-watcher.Events: - if event.Op == fsnotify.Remove { + if event.Op == fsnotify.Remove && event.Name == sc.socket { if err := sc.reconnect(); err != nil { Log.Errorf("error occurred during socket reconnect: %v", err) } - // path must be re-added to the watcher - if err = watcher.Add(sc.socketPath); err != nil { - Log.Errorf("failed to add socket address to the watcher: %v", err) - } } case err := <-watcher.Errors: Log.Errorf("socket monitor delivered error event: %v", err) @@ -413,7 +452,7 @@ func (sc *StatsClient) monitorSocket() { } }() - if err := watcher.Add(sc.socketPath); err != nil { + if err := watcher.Add(filepath.Dir(sc.socket)); err != nil { Log.Errorf("failed to add socket address to the watcher: %v", err) } } @@ -496,3 +535,7 @@ func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint return indexes, nil } + +func (sc *StatsClient) isConnected() bool { + return atomic.LoadUint32(&sc.connected) == 1 +} diff --git a/core/stats.go b/core/stats.go index f2da494..55c287e 100644 --- a/core/stats.go +++ b/core/stats.go @@ -3,7 +3,6 @@ package core import ( "path" "strings" - "sync/atomic" "time" "git.fd.io/govpp.git/adapter" @@ -11,8 +10,9 @@ import ( ) var ( - RetryUpdateCount = 10 - RetryUpdateDelay = time.Millisecond * 10 + RetryUpdateCount = 10 + RetryUpdateDelay = time.Millisecond * 10 + HealthCheckInterval = time.Second // default health check probe interval ) const ( @@ -76,8 +76,11 @@ const ( type StatsConnection struct { statsClient adapter.StatsAPI - // connected is true if the adapter is connected to VPP - connected uint32 + maxAttempts int // interval for reconnect attempts + recInterval time.Duration // maximum number of reconnect attempts + + connChan chan ConnectionEvent // connection event channel + done chan struct{} // to terminate stats connection watcher errorStatsData *adapter.StatDir nodeStatsData *adapter.StatDir @@ -87,9 +90,20 @@ type StatsConnection struct { memStatsData *adapter.StatDir } -func newStatsConnection(stats adapter.StatsAPI) *StatsConnection { +func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection { + if attempts == 0 { + attempts = DefaultMaxReconnectAttempts + } + if interval == 0 { + interval = DefaultReconnectInterval + } + return &StatsConnection{ statsClient: stats, + maxAttempts: attempts, + recInterval: interval, + connChan: make(chan ConnectionEvent, NotificationChanBufSize), + done: make(chan struct{}), } } @@ -97,28 +111,50 @@ func newStatsConnection(stats adapter.StatsAPI) *StatsConnection { // This call blocks until it is either connected, or an error occurs. // Only one connection attempt will be performed. func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) { - c := newStatsConnection(stats) + log.Debug("Connecting to stats..") + c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval) - if err := c.connectClient(); err != nil { + if err := c.statsClient.Connect(); err != nil { return nil, err } + log.Debugf("Connected to stats.") return c, nil } -func (c *StatsConnection) connectClient() error { - log.Debug("Connecting to stats..") +// AsyncConnectStats connects to the VPP stats socket asynchronously and returns the connection +// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent +// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the +// socket in case the session was disconnected. +func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) { + log.Debug("Connecting to stats asynchronously..") + c := newStatsConnection(stats, attempts, interval) - if err := c.statsClient.Connect(); err != nil { - return err - } + go c.connectLoop() - log.Debugf("Connected to stats.") + return c, c.connChan, nil +} - // store connected state - atomic.StoreUint32(&c.connected, 1) +func (c *StatsConnection) connectLoop() { + log.Debug("Asynchronously connecting to stats..") + var reconnectAttempts int - return nil + // loop until connected + for { + if err := c.statsClient.Connect(); err == nil { + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected}) + break + } else if reconnectAttempts < c.maxAttempts { + reconnectAttempts++ + log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) + time.Sleep(c.recInterval) + } else { + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}) + return + } + } + // start monitoring stats connection state + go c.monitorSocket() } // Disconnect disconnects from Stats API and releases all connection-related resources. @@ -127,14 +163,41 @@ func (c *StatsConnection) Disconnect() { return } if c.statsClient != nil { - c.disconnectClient() + if err := c.statsClient.Disconnect(); err != nil { + log.Debugf("disconnecting stats client failed: %v", err) + } } + close(c.connChan) + close(c.done) } -func (c *StatsConnection) disconnectClient() { - if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { - if err := c.statsClient.Disconnect(); err != nil { - log.Debugf("disconnecting stats client failed: %v", err) +func (c *StatsConnection) monitorSocket() { + var state, lastState ConnectionState + ticker := time.NewTicker(HealthCheckInterval) + + for { + select { + case <-ticker.C: + _, err := c.statsClient.ListStats(SystemStats_Heartbeat) + state = Connected + if err == adapter.ErrStatsDataBusy { + state = NotResponding + } + if err == adapter.ErrStatsDisconnected { + state = Disconnected + } + if err == adapter.ErrStatsAccessFailed { + state = Failed + } + if state == lastState { + continue + } + lastState = state + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err}) + case <-c.done: + log.Debugf("health check watcher closed") + c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil}) + break } } } @@ -498,3 +561,11 @@ func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error) } return nil } + +func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) { + select { + case c.connChan <- event: + default: + log.Warn("Stats connection state channel is full, discarding value.") + } +} diff --git a/examples/stats-client/stats_api.go b/examples/stats-client/stats_api.go index b1846a6..66dd451 100644 --- a/examples/stats-client/stats_api.go +++ b/examples/stats-client/stats_api.go @@ -39,6 +39,7 @@ var ( statsSocket = flag.String("socket", statsclient.DefaultSocketName, "Path to VPP stats socket") dumpAll = flag.Bool("all", false, "Dump all stats including ones with zero values") pollPeriod = flag.Duration("period", time.Second*5, "Polling interval period") + async = flag.Bool("async", false, "Use asynchronous connection") ) func init() { @@ -58,11 +59,33 @@ func main() { patterns = flag.Args()[1:] } - client := statsclient.NewStatsClient(*statsSocket) - - c, err := core.ConnectStats(client) - if err != nil { - log.Fatalln("Connecting failed:", err) + var ( + client *statsclient.StatsClient + c *core.StatsConnection + err error + ) + + if *async { + var statsChan chan core.ConnectionEvent + client = statsclient.NewStatsClient(*statsSocket) + c, statsChan, err = core.AsyncConnectStats(client, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval) + if err != nil { + log.Fatalln("Asynchronous connecting failed:", err) + } + select { + case e := <-statsChan: + if e.State == core.Connected { + // OK + } else { + log.Fatalf("VPP stats asynchronous connection failed: %s\n", e.State.String()) + } + } + } else { + client = statsclient.NewStatsClient(*statsSocket) + c, err = core.ConnectStats(client) + if err != nil { + log.Fatalln("Connecting failed:", err) + } } defer c.Disconnect() -- cgit 1.2.3-korg