aboutsummaryrefslogtreecommitdiffstats
path: root/adapter
diff options
context:
space:
mode:
authorVladimir Lavor <vlavor@cisco.com>2021-02-18 16:05:30 +0100
committerVladimir Lavor <vlavor@cisco.com>2021-02-23 13:28:47 +0100
commit4459b648e9fb53c34abbf52a00e63ad384fb9ee2 (patch)
treefbe5885b5aa2e4d3ff10255327d22ddd56a86c92 /adapter
parent4c1cccf48cd144414c7233f167087aff770ef67b (diff)
Added asynchronous connection for stats socket
The stats socket now allows an option to connect asynchronously in the same manner as the api socket connection. New method AsyncConnectStats returns a channel where notificaitons of type ConnectionEvent will be sent. Fixed the stats reconnect procedure which sometimes failed to re-eneable the connection. Change-Id: I0bdb19f0d57e3a1ea259b8b1ba0a5e5fa49a09db Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
Diffstat (limited to 'adapter')
-rw-r--r--adapter/stats_api.go1
-rw-r--r--adapter/statsclient/statsclient.go103
2 files changed, 74 insertions, 30 deletions
diff --git a/adapter/stats_api.go b/adapter/stats_api.go
index 90dbeb3..15c3789 100644
--- a/adapter/stats_api.go
+++ b/adapter/stats_api.go
@@ -27,6 +27,7 @@ const (
var (
ErrStatsDataBusy = errors.New("stats data busy")
ErrStatsDirStale = errors.New("stats dir stale")
+ ErrStatsDisconnected = errors.New("stats disconnected")
ErrStatsAccessFailed = errors.New("stats access failed")
)
diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go
index 9470275..e99d787 100644
--- a/adapter/statsclient/statsclient.go
+++ b/adapter/statsclient/statsclient.go
@@ -20,20 +20,28 @@ import (
"fmt"
"net"
"os"
+ "path/filepath"
"regexp"
+ "sync/atomic"
"syscall"
"time"
+ "git.fd.io/govpp.git/adapter"
"github.com/fsnotify/fsnotify"
"github.com/ftrvxmtrx/fd"
logger "github.com/sirupsen/logrus"
-
- "git.fd.io/govpp.git/adapter"
)
const (
// DefaultSocketName is default VPP stats socket file path.
DefaultSocketName = adapter.DefaultStatsSocket
+
+ // SocketRetryPeriod is the time period after the socket availability
+ // will be re-checked
+ SocketRetryPeriod = 50 * time.Millisecond
+
+ // SocketRetryTimeout is the maximum time for the stats socket
+ SocketRetryTimeout = 3 * time.Second
)
var (
@@ -64,10 +72,12 @@ var _ adapter.StatsAPI = (*StatsClient)(nil)
// StatsClient is the pure Go implementation for VPP stats API.
type StatsClient struct {
- socketPath string
+ socket string
+
+ headerData []byte
- headerData []byte
- isConnected bool
+ // defines the adapter connection state
+ connected uint32
// to quit socket monitor
done chan struct{}
@@ -82,17 +92,14 @@ func NewStatsClient(socket string) *StatsClient {
socket = DefaultSocketName
}
return &StatsClient{
- socketPath: socket,
+ socket: socket,
}
}
// Connect to validated VPP stats socket and start monitoring
// socket file changes
func (sc *StatsClient) Connect() (err error) {
- if sc.isConnected {
- return fmt.Errorf("already connected")
- }
- if err := sc.checkSocketValid(); err != nil {
+ if err := sc.waitForSocket(); err != nil {
return err
}
sc.done = make(chan struct{})
@@ -100,22 +107,29 @@ func (sc *StatsClient) Connect() (err error) {
return err
}
sc.monitorSocket()
- sc.isConnected = true
return nil
}
// Disconnect from the socket, unmap shared memory and terminate
// socket monitor
func (sc *StatsClient) Disconnect() error {
- if !sc.isConnected {
- return nil // not connected
+ if sc.headerData == nil {
+ return nil
}
- sc.isConnected = false
- close(sc.done)
- return sc.disconnect()
+ if err := syscall.Munmap(sc.headerData); err != nil {
+ Log.Debugf("unmapping shared memory failed: %v", err)
+ return fmt.Errorf("unmapping shared memory failed: %v", err)
+ }
+ sc.headerData = nil
+
+ Log.Debugf("successfully unmapped shared memory")
+ return nil
}
func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
accessEpoch := sc.accessStart()
if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
@@ -149,6 +163,9 @@ func (sc *StatsClient) ListStats(patterns ...string) ([]string, error) {
}
func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
accessEpoch := sc.accessStart()
if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
@@ -192,6 +209,9 @@ func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntr
}
func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
dir := new(adapter.StatDir)
accessEpoch := sc.accessStart()
@@ -241,6 +261,9 @@ func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error)
// UpdateDir refreshes directory data for all counters
func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
+ if !sc.isConnected() {
+ return adapter.ErrStatsDisconnected
+ }
epoch, _ := sc.GetEpoch()
if dir.Epoch != epoch {
return adapter.ErrStatsDirStale
@@ -281,11 +304,25 @@ func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
return nil
}
-func (sc *StatsClient) checkSocketValid() error {
- if _, err := os.Stat(sc.socketPath); os.IsNotExist(err) {
- return fmt.Errorf("stats socket file %s does not exist", sc.socketPath)
- } else if err != nil {
- return fmt.Errorf("stats socket error: %v", err)
+// checks the socket existence and waits for it for the designated
+// time if it is not available immediately
+func (sc *StatsClient) waitForSocket() error {
+ if _, err := os.Stat(sc.socket); err != nil {
+ if os.IsNotExist(err) {
+ ticker := time.NewTicker(SocketRetryPeriod)
+ for {
+ select {
+ case <-ticker.C:
+ if _, err := os.Stat(sc.socket); err == nil {
+ return nil
+ }
+ case <-time.After(SocketRetryTimeout):
+ return fmt.Errorf("stats socket file %s is not ready within timeout ", sc.socket)
+ }
+ }
+ } else {
+ return fmt.Errorf("stats socket error: %v", err)
+ }
}
return nil
}
@@ -295,7 +332,7 @@ func (sc *StatsClient) checkSocketValid() error {
func (sc *StatsClient) connect() (ss statSegment, err error) {
addr := net.UnixAddr{
Net: "unixpacket",
- Name: sc.socketPath,
+ Name: sc.socket,
}
Log.Debugf("connecting to: %v", addr)
@@ -350,6 +387,9 @@ func (sc *StatsClient) connect() (ss statSegment, err error) {
version, minVersion, maxVersion)
}
+ // set connected
+ atomic.CompareAndSwapUint32(&sc.connected, 0, 1)
+
return ss, nil
}
@@ -359,8 +399,8 @@ func (sc *StatsClient) reconnect() (err error) {
if err = sc.disconnect(); err != nil {
return fmt.Errorf("error disconnecting socket: %v", err)
}
- if err = sc.checkSocketValid(); err != nil {
- return fmt.Errorf("error validating socket: %v", err)
+ if err = sc.waitForSocket(); err != nil {
+ return fmt.Errorf("error while waiting on socket: %v", err)
}
if sc.statSegment, err = sc.connect(); err != nil {
return fmt.Errorf("error connecting socket: %v", err)
@@ -370,6 +410,9 @@ func (sc *StatsClient) reconnect() (err error) {
// disconnect unmaps socket data from the memory and resets the header
func (sc *StatsClient) disconnect() error {
+ if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) {
+ return fmt.Errorf("stats client is already disconnected")
+ }
if sc.headerData == nil {
return nil
}
@@ -394,14 +437,10 @@ func (sc *StatsClient) monitorSocket() {
for {
select {
case event := <-watcher.Events:
- if event.Op == fsnotify.Remove {
+ if event.Op == fsnotify.Remove && event.Name == sc.socket {
if err := sc.reconnect(); err != nil {
Log.Errorf("error occurred during socket reconnect: %v", err)
}
- // path must be re-added to the watcher
- if err = watcher.Add(sc.socketPath); err != nil {
- Log.Errorf("failed to add socket address to the watcher: %v", err)
- }
}
case err := <-watcher.Errors:
Log.Errorf("socket monitor delivered error event: %v", err)
@@ -413,7 +452,7 @@ func (sc *StatsClient) monitorSocket() {
}
}()
- if err := watcher.Add(sc.socketPath); err != nil {
+ if err := watcher.Add(filepath.Dir(sc.socket)); err != nil {
Log.Errorf("failed to add socket address to the watcher: %v", err)
}
}
@@ -496,3 +535,7 @@ func (sc *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint
return indexes, nil
}
+
+func (sc *StatsClient) isConnected() bool {
+ return atomic.LoadUint32(&sc.connected) == 1
+}