summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--adapter/stats_api.go1
-rw-r--r--adapter/statsclient/statsclient.go103
-rw-r--r--core/stats.go115
-rw-r--r--examples/stats-client/stats_api.go33
4 files changed, 195 insertions, 57 deletions
diff --git a/adapter/stats_api.go b/adapter/stats_api.go
index 90dbeb3..15c3789 100644
--- a/adapter/stats_api.go
+++ b/adapter/stats_api.go
@@ -27,6 +27,7 @@ const (
var (
ErrStatsDataBusy = errors.New("stats data busy")
ErrStatsDirStale = errors.New("stats dir stale")
+ ErrStatsDisconnected = errors.New("stats disconnected")
ErrStatsAccessFailed = errors.New("stats access failed")
)
diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go
index 9470275..e99d787 100644
--- a/adapter/statsclient/statsclient.go
+++ b/adapter/statsclient/statsclient.go
@@ -20,20 +20,28 @@ import (
"fmt"
"net"
"os"
+ "path/filepath"
"regexp"
+ "sync/atomic"
"syscall"
"time"
+ "git.fd.io/govpp.git/adapter"
"github.com/fsnotify/fsnotify"
"github.com/ftrvxmtrx/fd"
logger "github.com/sirupsen/logrus"
-
- "git.fd.io/govpp.git/adapter"
)
const (
// DefaultSocketName is default VPP stats socket file path.
DefaultSocketName = adapter.DefaultStatsSocket
+
+ // SocketRetryPeriod is the time period after the socket availability
+ // will be re-checked
+ SocketRetryPeriod = 50 * time.Millisecond
+
+ // SocketRetryTimeout is the maximum time for the stats socket
+ SocketRetryTimeout = 3 * time.Second
)
var (
@@ -64,10 +72,12 @@ var _ adapter.StatsAPI = (*StatsClient)(nil)
// StatsClient is the pure Go implementation for VPP stats API.
type StatsClient struct {
- socketPath string
+ socket string
+
+ headerData []byte
- headerData []byte
- isConnected bool
+ // defines the adapter connection state
+ connected uint32
// to quit socket monitor
done chan struct{}
@@ -82,17 +92,14 @@ func NewStatsClient(socket string) *StatsClient {
socket = DefaultSocketName
}
return &StatsClient{
- socketPath: socket,
+ socket: socket,
}
}
// Connect to validated VPP stats socket and start monitoring
// socket file changes
func (sc *StatsClient) Connect() (err error) {
- if sc.isConnected {
- return fmt.Errorf("already connected")
- }
- if err := sc.checkSocketValid(); err != nil {
+ if err := sc.waitForSocket(); err != nil {
return err
}
sc.done = make(chan struct{})
@@ -100,22 +107,29 @@ func (sc *StatsClient) Connect() (err error) {
return err
}
sc.monitorSocket()
- sc.isConnected = true
return nil
}
// Disconnect from the socket, unmap shared memory and terminate
// socket monitor
func (sc *StatsClient) Disconnect() error {
- if !sc.isConnected {
- return nil // not connected
+ if sc.headerData == nil {
+ return nil
}
- sc.isConnected = false
- close(sc.done)
- return sc.disconnect()
+ if err := syscall.Munmap(sc.headerData); err != nil {
+ Log.Debugf("unmapping shared memory failed: %v", err)
+ return fmt.Errorf("unmapping shared memory failed: %v", err)
+ }
+ sc.headerData = nil
+
+ Log.Debugf("successfully unmapped shared memory")
+ return nil
}
func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
accessEpoch := sc.accessStart()
if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
@@ -149,6 +163,9 @@ func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
}
func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
accessEpoch := sc.accessStart()
if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
@@ -192,6 +209,9 @@ func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntr
}
func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
dir := new(adapter.StatDir)
accessEpoch := sc.accessStart()
@@ -241,6 +261,9 @@ func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error)
// UpdateDir refreshes directory data for all counters
func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
+ if !sc.isConnected() {
+ return adapter.ErrStatsDisconnected
+ }
epoch, _ := sc.GetEpoch()
if dir.Epoch != epoch {
return adapter.ErrStatsDirStale
@@ -281,11 +304,25 @@ func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
return nil
}
-func (sc *StatsClient) checkSocketValid() error {
- if _, err := os.Stat(sc.socketPath); os.IsNotExist(err) {
- return fmt.Errorf("stats socket file %s does not exist", sc.socketPath)
- } else if err != nil {
- return fmt.Errorf("stats socket error: %v", err)
+// checks the socket existence and waits for it for the designated
+// time if it is not available immediately
+func (sc *StatsClient) waitForSocket() error {
+ if _, err := os.Stat(sc.socket); err != nil {
+ if os.IsNotExist(err) {
+ ticker := time.NewTicker(SocketRetryPeriod)
+ for {
+ select {
+ case <-ticker.C:
+ if _, err := os.Stat(sc.socket); err == nil {
+ return nil
+ }
+ case <-time.After(SocketRetryTimeout):
+ return fmt.Errorf("stats socket file %s is not ready within timeout ", sc.socket)
+ }
+ }
+ } else {
+ return fmt.Errorf("stats socket error: %v", err)
+ }
}
return nil
}
@@ -295,7 +332,7 @@ func (sc *StatsClient) checkSocketValid() error {
func (sc *StatsClient) connect() (ss statSegment, err error) {
addr := net.UnixAddr{
Net: "unixpacket",
- Name: sc.socketPath,
+ Name: sc.socket,
}
Log.Debugf("connecting to: %v", addr)
@@ -350,6 +387,9 @@ func (sc *StatsClient) connect() (ss statSegment, err error) {
version, minVersion, maxVersion)
}
+ // set connected
+ atomic.CompareAndSwapUint32(&sc.connected, 0, 1)
+
return ss, nil
}
@@ -359,8 +399,8 @@ func (sc *StatsClient) reconnect() (err error) {
if err = sc.disconnect(); err != nil {
return fmt.Errorf("error disconnecting socket: %v", err)
}
- if err = sc.checkSocketValid(); err != nil {
- return fmt.Errorf("error validating socket: %v", err)
+ if err = sc.waitForSocket(); err != nil {
+ return fmt.Errorf("error while waiting on socket: %v", err)
}
if sc.statSegment, err = sc.connect(); err != nil {
return fmt.Errorf("error connecting socket: %v", err)
@@ -370,6 +410,9 @@ func (sc *StatsClient) reconnect() (err error) {
// disconnect unmaps socket data from the memory and resets the header
func (sc *StatsClient) disconnect() error {
+ if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) {
+ return fmt.Errorf("stats client is already disconnected")
+ }
if sc.headerData == nil {
return nil
}
@@ -394,14 +437,10 @@ func (sc *StatsClient) monitorSocket() {
for {
select {
case event := <-watcher.Events:
- if event.Op == fsnotify.Remove {
+ if event.Op == fsnotify.Remove && event.Name == sc.socket {
if err := sc.reconnect(); err != nil {
Log.Errorf("error occurred during socket reconnect: %v", err)
}
- // path must be re-added to the watcher
- if err = watcher.Add(sc.socketPath); err != nil {
- Log.Errorf("failed to add socket address to the watcher: %v", err)
- }
}
case err := <-watcher.Errors:
Log.Errorf("socket monitor delivered error event: %v", err)
@@ -413,7 +452,7 @@ func (sc *StatsClient) monitorSocket() {
}
}()
- if err := watcher.Add(sc.socketPath); err != nil {
+ if err := watcher.Add(filepath.Dir(sc.socket)); err != nil {
Log.Errorf("failed to add socket address to the watcher: %v", err)
}
}
@@ -496,3 +535,7 @@ func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint
return indexes, nil
}
+
+func (sc *StatsClient) isConnected() bool {
+ return atomic.LoadUint32(&sc.connected) == 1
+}
diff --git a/core/stats.go b/core/stats.go
index f2da494..55c287e 100644
--- a/core/stats.go
+++ b/core/stats.go
@@ -3,7 +3,6 @@ package core
import (
"path"
"strings"
- "sync/atomic"
"time"
"git.fd.io/govpp.git/adapter"
@@ -11,8 +10,9 @@ import (
)
var (
- RetryUpdateCount = 10
- RetryUpdateDelay = time.Millisecond * 10
+ RetryUpdateCount = 10
+ RetryUpdateDelay = time.Millisecond * 10
+ HealthCheckInterval = time.Second // default health check probe interval
)
const (
@@ -76,8 +76,11 @@ const (
type StatsConnection struct {
statsClient adapter.StatsAPI
- // connected is true if the adapter is connected to VPP
- connected uint32
+ maxAttempts int // interval for reconnect attempts
+ recInterval time.Duration // maximum number of reconnect attempts
+
+ connChan chan ConnectionEvent // connection event channel
+ done chan struct{} // to terminate stats connection watcher
errorStatsData *adapter.StatDir
nodeStatsData *adapter.StatDir
@@ -87,9 +90,20 @@ type StatsConnection struct {
memStatsData *adapter.StatDir
}
-func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
+func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection {
+ if attempts == 0 {
+ attempts = DefaultMaxReconnectAttempts
+ }
+ if interval == 0 {
+ interval = DefaultReconnectInterval
+ }
+
return &StatsConnection{
statsClient: stats,
+ maxAttempts: attempts,
+ recInterval: interval,
+ connChan: make(chan ConnectionEvent, NotificationChanBufSize),
+ done: make(chan struct{}),
}
}
@@ -97,28 +111,50 @@ func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
// This call blocks until it is either connected, or an error occurs.
// Only one connection attempt will be performed.
func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) {
- c := newStatsConnection(stats)
+ log.Debug("Connecting to stats..")
+ c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
- if err := c.connectClient(); err != nil {
+ if err := c.statsClient.Connect(); err != nil {
return nil, err
}
+ log.Debugf("Connected to stats.")
return c, nil
}
-func (c *StatsConnection) connectClient() error {
- log.Debug("Connecting to stats..")
+// AsyncConnectStats connects to the VPP stats socket asynchronously and returns the connection
+// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent
+// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the
+// socket in case the session was disconnected.
+func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) {
+ log.Debug("Connecting to stats asynchronously..")
+ c := newStatsConnection(stats, attempts, interval)
- if err := c.statsClient.Connect(); err != nil {
- return err
- }
+ go c.connectLoop()
- log.Debugf("Connected to stats.")
+ return c, c.connChan, nil
+}
- // store connected state
- atomic.StoreUint32(&c.connected, 1)
+func (c *StatsConnection) connectLoop() {
+ log.Debug("Asynchronously connecting to stats..")
+ var reconnectAttempts int
- return nil
+ // loop until connected
+ for {
+ if err := c.statsClient.Connect(); err == nil {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
+ break
+ } else if reconnectAttempts < c.maxAttempts {
+ reconnectAttempts++
+ log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ time.Sleep(c.recInterval)
+ } else {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
+ return
+ }
+ }
+ // start monitoring stats connection state
+ go c.monitorSocket()
}
// Disconnect disconnects from Stats API and releases all connection-related resources.
@@ -127,14 +163,41 @@ func (c *StatsConnection) Disconnect() {
return
}
if c.statsClient != nil {
- c.disconnectClient()
+ if err := c.statsClient.Disconnect(); err != nil {
+ log.Debugf("disconnecting stats client failed: %v", err)
+ }
}
+ close(c.connChan)
+ close(c.done)
}
-func (c *StatsConnection) disconnectClient() {
- if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
- if err := c.statsClient.Disconnect(); err != nil {
- log.Debugf("disconnecting stats client failed: %v", err)
+func (c *StatsConnection) monitorSocket() {
+ var state, lastState ConnectionState
+ ticker := time.NewTicker(HealthCheckInterval)
+
+ for {
+ select {
+ case <-ticker.C:
+ _, err := c.statsClient.ListStats(SystemStats_Heartbeat)
+ state = Connected
+ if err == adapter.ErrStatsDataBusy {
+ state = NotResponding
+ }
+ if err == adapter.ErrStatsDisconnected {
+ state = Disconnected
+ }
+ if err == adapter.ErrStatsAccessFailed {
+ state = Failed
+ }
+ if state == lastState {
+ continue
+ }
+ lastState = state
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err})
+ case <-c.done:
+ log.Debugf("health check watcher closed")
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil})
+ break
}
}
}
@@ -498,3 +561,11 @@ func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error)
}
return nil
}
+
+func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) {
+ select {
+ case c.connChan <- event:
+ default:
+ log.Warn("Stats connection state channel is full, discarding value.")
+ }
+}
diff --git a/examples/stats-client/stats_api.go b/examples/stats-client/stats_api.go
index b1846a6..66dd451 100644
--- a/examples/stats-client/stats_api.go
+++ b/examples/stats-client/stats_api.go
@@ -39,6 +39,7 @@ var (
statsSocket = flag.String("socket", statsclient.DefaultSocketName, "Path to VPP stats socket")
dumpAll = flag.Bool("all", false, "Dump all stats including ones with zero values")
pollPeriod = flag.Duration("period", time.Second*5, "Polling interval period")
+ async = flag.Bool("async", false, "Use asynchronous connection")
)
func init() {
@@ -58,11 +59,33 @@ func main() {
patterns = flag.Args()[1:]
}
- client := statsclient.NewStatsClient(*statsSocket)
-
- c, err := core.ConnectStats(client)
- if err != nil {
- log.Fatalln("Connecting failed:", err)
+ var (
+ client *statsclient.StatsClient
+ c *core.StatsConnection
+ err error
+ )
+
+ if *async {
+ var statsChan chan core.ConnectionEvent
+ client = statsclient.NewStatsClient(*statsSocket)
+ c, statsChan, err = core.AsyncConnectStats(client, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
+ if err != nil {
+ log.Fatalln("Asynchronous connecting failed:", err)
+ }
+ select {
+ case e := <-statsChan:
+ if e.State == core.Connected {
+ // OK
+ } else {
+ log.Fatalf("VPP stats asynchronous connection failed: %s\n", e.State.String())
+ }
+ }
+ } else {
+ client = statsclient.NewStatsClient(*statsSocket)
+ c, err = core.ConnectStats(client)
+ if err != nil {
+ log.Fatalln("Connecting failed:", err)
+ }
}
defer c.Disconnect()