aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Lavor <vlavor@cisco.com>2021-02-18 16:05:30 +0100
committerVladimir Lavor <vlavor@cisco.com>2021-02-23 13:28:47 +0100
commit4459b648e9fb53c34abbf52a00e63ad384fb9ee2 (patch)
treefbe5885b5aa2e4d3ff10255327d22ddd56a86c92
parent4c1cccf48cd144414c7233f167087aff770ef67b (diff)
Added asynchronous connection for stats socket
The stats socket now allows an option to connect asynchronously in the same manner as the api socket connection. New method AsyncConnectStats returns a channel where notificaitons of type ConnectionEvent will be sent. Fixed the stats reconnect procedure which sometimes failed to re-eneable the connection. Change-Id: I0bdb19f0d57e3a1ea259b8b1ba0a5e5fa49a09db Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
-rw-r--r--adapter/stats_api.go1
-rw-r--r--adapter/statsclient/statsclient.go103
-rw-r--r--core/stats.go115
-rw-r--r--examples/stats-client/stats_api.go33
4 files changed, 195 insertions, 57 deletions
diff --git a/adapter/stats_api.go b/adapter/stats_api.go
index 90dbeb3..15c3789 100644
--- a/adapter/stats_api.go
+++ b/adapter/stats_api.go
@@ -27,6 +27,7 @@ const (
var (
ErrStatsDataBusy = errors.New("stats data busy")
ErrStatsDirStale = errors.New("stats dir stale")
+ ErrStatsDisconnected = errors.New("stats disconnected")
ErrStatsAccessFailed = errors.New("stats access failed")
)
diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go
index 9470275..e99d787 100644
--- a/adapter/statsclient/statsclient.go
+++ b/adapter/statsclient/statsclient.go
@@ -20,20 +20,28 @@ import (
"fmt"
"net"
"os"
+ "path/filepath"
"regexp"
+ "sync/atomic"
"syscall"
"time"
+ "git.fd.io/govpp.git/adapter"
"github.com/fsnotify/fsnotify"
"github.com/ftrvxmtrx/fd"
logger "github.com/sirupsen/logrus"
-
- "git.fd.io/govpp.git/adapter"
)
const (
// DefaultSocketName is default VPP stats socket file path.
DefaultSocketName = adapter.DefaultStatsSocket
+
+ // SocketRetryPeriod is the time period after the socket availability
+ // will be re-checked
+ SocketRetryPeriod = 50 * time.Millisecond
+
+ // SocketRetryTimeout is the maximum time for the stats socket
+ SocketRetryTimeout = 3 * time.Second
)
var (
@@ -64,10 +72,12 @@ var _ adapter.StatsAPI = (*StatsClient)(nil)
// StatsClient is the pure Go implementation for VPP stats API.
type StatsClient struct {
- socketPath string
+ socket string
+
+ headerData []byte
- headerData []byte
- isConnected bool
+ // defines the adapter connection state
+ connected uint32
// to quit socket monitor
done chan struct{}
@@ -82,17 +92,14 @@ func NewStatsClient(socket string) *StatsClient {
socket = DefaultSocketName
}
return &StatsClient{
- socketPath: socket,
+ socket: socket,
}
}
// Connect to validated VPP stats socket and start monitoring
// socket file changes
func (sc *StatsClient) Connect() (err error) {
- if sc.isConnected {
- return fmt.Errorf("already connected")
- }
- if err := sc.checkSocketValid(); err != nil {
+ if err := sc.waitForSocket(); err != nil {
return err
}
sc.done = make(chan struct{})
@@ -100,22 +107,29 @@ func (sc *StatsClient) Connect() (err error) {
return err
}
sc.monitorSocket()
- sc.isConnected = true
return nil
}
// Disconnect from the socket, unmap shared memory and terminate
// socket monitor
func (sc *StatsClient) Disconnect() error {
- if !sc.isConnected {
- return nil // not connected
+ if sc.headerData == nil {
+ return nil
}
- sc.isConnected = false
- close(sc.done)
- return sc.disconnect()
+ if err := syscall.Munmap(sc.headerData); err != nil {
+ Log.Debugf("unmapping shared memory failed: %v", err)
+ return fmt.Errorf("unmapping shared memory failed: %v", err)
+ }
+ sc.headerData = nil
+
+ Log.Debugf("successfully unmapped shared memory")
+ return nil
}
func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
accessEpoch := sc.accessStart()
if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
@@ -149,6 +163,9 @@ func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
}
func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
accessEpoch := sc.accessStart()
if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
@@ -192,6 +209,9 @@ func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntr
}
func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
dir := new(adapter.StatDir)
accessEpoch := sc.accessStart()
@@ -241,6 +261,9 @@ func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error)
// UpdateDir refreshes directory data for all counters
func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
+ if !sc.isConnected() {
+ return adapter.ErrStatsDisconnected
+ }
epoch, _ := sc.GetEpoch()
if dir.Epoch != epoch {
return adapter.ErrStatsDirStale
@@ -281,11 +304,25 @@ func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
return nil
}
-func (sc *StatsClient) checkSocketValid() error {
- if _, err := os.Stat(sc.socketPath); os.IsNotExist(err) {
- return fmt.Errorf("stats socket file %s does not exist", sc.socketPath)
- } else if err != nil {
- return fmt.Errorf("stats socket error: %v", err)
+// checks the socket existence and waits for it for the designated
+// time if it is not available immediately
+func (sc *StatsClient) waitForSocket() error {
+ if _, err := os.Stat(sc.socket); err != nil {
+ if os.IsNotExist(err) {
+ ticker := time.NewTicker(SocketRetryPeriod)
+ for {
+ select {
+ case <-ticker.C:
+ if _, err := os.Stat(sc.socket); err == nil {
+ return nil
+ }
+ case <-time.After(SocketRetryTimeout):
+ return fmt.Errorf("stats socket file %s is not ready within timeout ", sc.socket)
+ }
+ }
+ } else {
+ return fmt.Errorf("stats socket error: %v", err)
+ }
}
return nil
}
@@ -295,7 +332,7 @@ func (sc *StatsClient) checkSocketValid() error {
func (sc *StatsClient) connect() (ss statSegment, err error) {
addr := net.UnixAddr{
Net: "unixpacket",
- Name: sc.socketPath,
+ Name: sc.socket,
}
Log.Debugf("connecting to: %v", addr)
@@ -350,6 +387,9 @@ func (sc *StatsClient) connect() (ss statSegment, err error) {
version, minVersion, maxVersion)
}
+ // set connected
+ atomic.CompareAndSwapUint32(&sc.connected, 0, 1)
+
return ss, nil
}
@@ -359,8 +399,8 @@ func (sc *StatsClient) reconnect() (err error) {
if err = sc.disconnect(); err != nil {
return fmt.Errorf("error disconnecting socket: %v", err)
}
- if err = sc.checkSocketValid(); err != nil {
- return fmt.Errorf("error validating socket: %v", err)
+ if err = sc.waitForSocket(); err != nil {
+ return fmt.Errorf("error while waiting on socket: %v", err)
}
if sc.statSegment, err = sc.connect(); err != nil {
return fmt.Errorf("error connecting socket: %v", err)
@@ -370,6 +410,9 @@ func (sc *StatsClient) reconnect() (err error) {
// disconnect unmaps socket data from the memory and resets the header
func (sc *StatsClient) disconnect() error {
+ if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) {
+ return fmt.Errorf("stats client is already disconnected")
+ }
if sc.headerData == nil {
return nil
}
@@ -394,14 +437,10 @@ func (sc *StatsClient) monitorSocket() {
for {
select {
case event := <-watcher.Events:
- if event.Op == fsnotify.Remove {
+ if event.Op == fsnotify.Remove && event.Name == sc.socket {
if err := sc.reconnect(); err != nil {
Log.Errorf("error occurred during socket reconnect: %v", err)
}
- // path must be re-added to the watcher
- if err = watcher.Add(sc.socketPath); err != nil {
- Log.Errorf("failed to add socket address to the watcher: %v", err)
- }
}
case err := <-watcher.Errors:
Log.Errorf("socket monitor delivered error event: %v", err)
@@ -413,7 +452,7 @@ func (sc *StatsClient) monitorSocket() {
}
}()
- if err := watcher.Add(sc.socketPath); err != nil {
+ if err := watcher.Add(filepath.Dir(sc.socket)); err != nil {
Log.Errorf("failed to add socket address to the watcher: %v", err)
}
}
@@ -496,3 +535,7 @@ func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint
return indexes, nil
}
+
+func (sc *StatsClient) isConnected() bool {
+ return atomic.LoadUint32(&sc.connected) == 1
+}
diff --git a/core/stats.go b/core/stats.go
index f2da494..55c287e 100644
--- a/core/stats.go
+++ b/core/stats.go
@@ -3,7 +3,6 @@ package core
import (
"path"
"strings"
- "sync/atomic"
"time"
"git.fd.io/govpp.git/adapter"
@@ -11,8 +10,9 @@ import (
)
var (
- RetryUpdateCount = 10
- RetryUpdateDelay = time.Millisecond * 10
+ RetryUpdateCount = 10
+ RetryUpdateDelay = time.Millisecond * 10
+ HealthCheckInterval = time.Second // default health check probe interval
)
const (
@@ -76,8 +76,11 @@ const (
type StatsConnection struct {
statsClient adapter.StatsAPI
- // connected is true if the adapter is connected to VPP
- connected uint32
+ maxAttempts int // interval for reconnect attempts
+ recInterval time.Duration // maximum number of reconnect attempts
+
+ connChan chan ConnectionEvent // connection event channel
+ done chan struct{} // to terminate stats connection watcher
errorStatsData *adapter.StatDir
nodeStatsData *adapter.StatDir
@@ -87,9 +90,20 @@ type StatsConnection struct {
memStatsData *adapter.StatDir
}
-func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
+func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection {
+ if attempts == 0 {
+ attempts = DefaultMaxReconnectAttempts
+ }
+ if interval == 0 {
+ interval = DefaultReconnectInterval
+ }
+
return &StatsConnection{
statsClient: stats,
+ maxAttempts: attempts,
+ recInterval: interval,
+ connChan: make(chan ConnectionEvent, NotificationChanBufSize),
+ done: make(chan struct{}),
}
}
@@ -97,28 +111,50 @@ func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
// This call blocks until it is either connected, or an error occurs.
// Only one connection attempt will be performed.
func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) {
- c := newStatsConnection(stats)
+ log.Debug("Connecting to stats..")
+ c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
- if err := c.connectClient(); err != nil {
+ if err := c.statsClient.Connect(); err != nil {
return nil, err
}
+ log.Debugf("Connected to stats.")
return c, nil
}
-func (c *StatsConnection) connectClient() error {
- log.Debug("Connecting to stats..")
+// AsyncConnectStats connects to the VPP stats socket asynchronously and returns the connection
+// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent
+// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the
+// socket in case the session was disconnected.
+func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) {
+ log.Debug("Connecting to stats asynchronously..")
+ c := newStatsConnection(stats, attempts, interval)
- if err := c.statsClient.Connect(); err != nil {
- return err
- }
+ go c.connectLoop()
- log.Debugf("Connected to stats.")
+ return c, c.connChan, nil
+}
- // store connected state
- atomic.StoreUint32(&c.connected, 1)
+func (c *StatsConnection) connectLoop() {
+ log.Debug("Asynchronously connecting to stats..")
+ var reconnectAttempts int
- return nil
+ // loop until connected
+ for {
+ if err := c.statsClient.Connect(); err == nil {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
+ break
+ } else if reconnectAttempts < c.maxAttempts {
+ reconnectAttempts++
+ log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ time.Sleep(c.recInterval)
+ } else {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
+ return
+ }
+ }
+ // start monitoring stats connection state
+ go c.monitorSocket()
}
// Disconnect disconnects from Stats API and releases all connection-related resources.
@@ -127,14 +163,41 @@ func (c *StatsConnection) Disconnect() {
return
}
if c.statsClient != nil {
- c.disconnectClient()
+ if err := c.statsClient.Disconnect(); err != nil {
+ log.Debugf("disconnecting stats client failed: %v", err)
+ }
}
+ close(c.connChan)
+ close(c.done)
}
-func (c *StatsConnection) disconnectClient() {
- if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
- if err := c.statsClient.Disconnect(); err != nil {
- log.Debugf("disconnecting stats client failed: %v", err)
+func (c *StatsConnection) monitorSocket() {
+ var state, lastState ConnectionState
+ ticker := time.NewTicker(HealthCheckInterval)
+
+ for {
+ select {
+ case <-ticker.C:
+ _, err := c.statsClient.ListStats(SystemStats_Heartbeat)
+ state = Connected
+ if err == adapter.ErrStatsDataBusy {
+ state = NotResponding
+ }
+ if err == adapter.ErrStatsDisconnected {
+ state = Disconnected
+ }
+ if err == adapter.ErrStatsAccessFailed {
+ state = Failed
+ }
+ if state == lastState {
+ continue
+ }
+ lastState = state
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err})
+ case <-c.done:
+ log.Debugf("health check watcher closed")
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil})
+ break
}
}
}
@@ -498,3 +561,11 @@ func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error)
}
return nil
}
+
+func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) {
+ select {
+ case c.connChan <- event:
+ default:
+ log.Warn("Stats connection state channel is full, discarding value.")
+ }
+}
diff --git a/examples/stats-client/stats_api.go b/examples/stats-client/stats_api.go
index b1846a6..66dd451 100644
--- a/examples/stats-client/stats_api.go
+++ b/examples/stats-client/stats_api.go
@@ -39,6 +39,7 @@ var (
statsSocket = flag.String("socket", statsclient.DefaultSocketName, "Path to VPP stats socket")
dumpAll = flag.Bool("all", false, "Dump all stats including ones with zero values")
pollPeriod = flag.Duration("period", time.Second*5, "Polling interval period")
+ async = flag.Bool("async", false, "Use asynchronous connection")
)
func init() {
@@ -58,11 +59,33 @@ func main() {
patterns = flag.Args()[1:]
}
- client := statsclient.NewStatsClient(*statsSocket)
-
- c, err := core.ConnectStats(client)
- if err != nil {
- log.Fatalln("Connecting failed:", err)
+ var (
+ client *statsclient.StatsClient
+ c *core.StatsConnection
+ err error
+ )
+
+ if *async {
+ var statsChan chan core.ConnectionEvent
+ client = statsclient.NewStatsClient(*statsSocket)
+ c, statsChan, err = core.AsyncConnectStats(client, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
+ if err != nil {
+ log.Fatalln("Asynchronous connecting failed:", err)
+ }
+ select {
+ case e := <-statsChan:
+ if e.State == core.Connected {
+ // OK
+ } else {
+ log.Fatalf("VPP stats asynchronous connection failed: %s\n", e.State.String())
+ }
+ }
+ } else {
+ client = statsclient.NewStatsClient(*statsSocket)
+ c, err = core.ConnectStats(client)
+ if err != nil {
+ log.Fatalln("Connecting failed:", err)
+ }
}
defer c.Disconnect()