aboutsummaryrefslogtreecommitdiffstats
path: root/adapter/statsclient/statsclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'adapter/statsclient/statsclient.go')
-rw-r--r--adapter/statsclient/statsclient.go103
1 files changed, 73 insertions, 30 deletions
diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go
index 9470275..e99d787 100644
--- a/adapter/statsclient/statsclient.go
+++ b/adapter/statsclient/statsclient.go
@@ -20,20 +20,28 @@ import (
"fmt"
"net"
"os"
+ "path/filepath"
"regexp"
+ "sync/atomic"
"syscall"
"time"
+ "git.fd.io/govpp.git/adapter"
"github.com/fsnotify/fsnotify"
"github.com/ftrvxmtrx/fd"
logger "github.com/sirupsen/logrus"
-
- "git.fd.io/govpp.git/adapter"
)
const (
// DefaultSocketName is default VPP stats socket file path.
DefaultSocketName = adapter.DefaultStatsSocket
+
+ // SocketRetryPeriod is the time period after the socket availability
+ // will be re-checked
+ SocketRetryPeriod = 50 * time.Millisecond
+
+ // SocketRetryTimeout is the maximum time for the stats socket
+ SocketRetryTimeout = 3 * time.Second
)
var (
@@ -64,10 +72,12 @@ var _ adapter.StatsAPI = (*StatsClient)(nil)
// StatsClient is the pure Go implementation for VPP stats API.
type StatsClient struct {
- socketPath string
+ socket string
+
+ headerData []byte
- headerData []byte
- isConnected bool
+ // defines the adapter connection state
+ connected uint32
// to quit socket monitor
done chan struct{}
@@ -82,17 +92,14 @@ func NewStatsClient(socket string) *StatsClient {
socket = DefaultSocketName
}
return &StatsClient{
- socketPath: socket,
+ socket: socket,
}
}
// Connect to validated VPP stats socket and start monitoring
// socket file changes
func (sc *StatsClient) Connect() (err error) {
- if sc.isConnected {
- return fmt.Errorf("already connected")
- }
- if err := sc.checkSocketValid(); err != nil {
+ if err := sc.waitForSocket(); err != nil {
return err
}
sc.done = make(chan struct{})
@@ -100,22 +107,29 @@ func (sc *StatsClient) Connect() (err error) {
return err
}
sc.monitorSocket()
- sc.isConnected = true
return nil
}
// Disconnect from the socket, unmap shared memory and terminate
// socket monitor
func (sc *StatsClient) Disconnect() error {
- if !sc.isConnected {
- return nil // not connected
+ if sc.headerData == nil {
+ return nil
}
- sc.isConnected = false
- close(sc.done)
- return sc.disconnect()
+ if err := syscall.Munmap(sc.headerData); err != nil {
+ Log.Debugf("unmapping shared memory failed: %v", err)
+ return fmt.Errorf("unmapping shared memory failed: %v", err)
+ }
+ sc.headerData = nil
+
+ Log.Debugf("successfully unmapped shared memory")
+ return nil
}
func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
accessEpoch := sc.accessStart()
if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
@@ -149,6 +163,9 @@ func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
}
func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
accessEpoch := sc.accessStart()
if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
@@ -192,6 +209,9 @@ func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntr
}
func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
dir := new(adapter.StatDir)
accessEpoch := sc.accessStart()
@@ -241,6 +261,9 @@ func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error)
// UpdateDir refreshes directory data for all counters
func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
+ if !sc.isConnected() {
+ return adapter.ErrStatsDisconnected
+ }
epoch, _ := sc.GetEpoch()
if dir.Epoch != epoch {
return adapter.ErrStatsDirStale
@@ -281,11 +304,25 @@ func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
return nil
}
-func (sc *StatsClient) checkSocketValid() error {
- if _, err := os.Stat(sc.socketPath); os.IsNotExist(err) {
- return fmt.Errorf("stats socket file %s does not exist", sc.socketPath)
- } else if err != nil {
- return fmt.Errorf("stats socket error: %v", err)
+// checks the socket existence and waits for it for the designated
+// time if it is not available immediately
+func (sc *StatsClient) waitForSocket() error {
+ if _, err := os.Stat(sc.socket); err != nil {
+ if os.IsNotExist(err) {
+ ticker := time.NewTicker(SocketRetryPeriod)
+ for {
+ select {
+ case <-ticker.C:
+ if _, err := os.Stat(sc.socket); err == nil {
+ return nil
+ }
+ case <-time.After(SocketRetryTimeout):
+ return fmt.Errorf("stats socket file %s is not ready within timeout ", sc.socket)
+ }
+ }
+ } else {
+ return fmt.Errorf("stats socket error: %v", err)
+ }
}
return nil
}
@@ -295,7 +332,7 @@ func (sc *StatsClient) checkSocketValid() error {
func (sc *StatsClient) connect() (ss statSegment, err error) {
addr := net.UnixAddr{
Net: "unixpacket",
- Name: sc.socketPath,
+ Name: sc.socket,
}
Log.Debugf("connecting to: %v", addr)
@@ -350,6 +387,9 @@ func (sc *StatsClient) connect() (ss statSegment, err error) {
version, minVersion, maxVersion)
}
+ // set connected
+ atomic.CompareAndSwapUint32(&sc.connected, 0, 1)
+
return ss, nil
}
@@ -359,8 +399,8 @@ func (sc *StatsClient) reconnect() (err error) {
if err = sc.disconnect(); err != nil {
return fmt.Errorf("error disconnecting socket: %v", err)
}
- if err = sc.checkSocketValid(); err != nil {
- return fmt.Errorf("error validating socket: %v", err)
+ if err = sc.waitForSocket(); err != nil {
+ return fmt.Errorf("error while waiting on socket: %v", err)
}
if sc.statSegment, err = sc.connect(); err != nil {
return fmt.Errorf("error connecting socket: %v", err)
@@ -370,6 +410,9 @@ func (sc *StatsClient) reconnect() (err error) {
// disconnect unmaps socket data from the memory and resets the header
func (sc *StatsClient) disconnect() error {
+ if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) {
+ return fmt.Errorf("stats client is already disconnected")
+ }
if sc.headerData == nil {
return nil
}
@@ -394,14 +437,10 @@ func (sc *StatsClient) monitorSocket() {
for {
select {
case event := <-watcher.Events:
- if event.Op == fsnotify.Remove {
+ if event.Op == fsnotify.Remove && event.Name == sc.socket {
if err := sc.reconnect(); err != nil {
Log.Errorf("error occurred during socket reconnect: %v", err)
}
- // path must be re-added to the watcher
- if err = watcher.Add(sc.socketPath); err != nil {
- Log.Errorf("failed to add socket address to the watcher: %v", err)
- }
}
case err := <-watcher.Errors:
Log.Errorf("socket monitor delivered error event: %v", err)
@@ -413,7 +452,7 @@ func (sc *StatsClient) monitorSocket() {
}
}()
- if err := watcher.Add(sc.socketPath); err != nil {
+ if err := watcher.Add(filepath.Dir(sc.socket)); err != nil {
Log.Errorf("failed to add socket address to the watcher: %v", err)
}
}
@@ -496,3 +535,7 @@ func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint
return indexes, nil
}
+
+func (sc *StatsClient) isConnected() bool {
+ return atomic.LoadUint32(&sc.connected) == 1
+}