aboutsummaryrefslogtreecommitdiffstats
path: root/core/stats.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/stats.go')
-rw-r--r--core/stats.go198
1 files changed, 169 insertions, 29 deletions
diff --git a/core/stats.go b/core/stats.go
index 0717be6..897a475 100644
--- a/core/stats.go
+++ b/core/stats.go
@@ -3,16 +3,16 @@ package core
import (
"path"
"strings"
- "sync/atomic"
"time"
- "git.fd.io/govpp.git/adapter"
- "git.fd.io/govpp.git/api"
+ "go.fd.io/govpp/adapter"
+ "go.fd.io/govpp/api"
)
var (
- RetryUpdateCount = 10
- RetryUpdateDelay = time.Millisecond * 10
+ RetryUpdateCount = 10
+ RetryUpdateDelay = time.Millisecond * 10
+ HealthCheckInterval = time.Second // default health check probe interval
)
const (
@@ -39,6 +39,12 @@ const (
CounterStatsPrefix = "/err/"
+ MemoryStatSegPrefix = "/mem/statseg"
+ MemoryStatSegment = "/mem/stat segment"
+ MemoryMainHeap = "/mem/main heap"
+ MemoryStats_Total = "total"
+ MemoryStats_Used = "used"
+
InterfaceStatsPrefix = "/if/"
InterfaceStats_Names = InterfaceStatsPrefix + "names"
InterfaceStats_Drops = InterfaceStatsPrefix + "drops"
@@ -72,19 +78,34 @@ const (
type StatsConnection struct {
statsClient adapter.StatsAPI
- // connected is true if the adapter is connected to VPP
- connected uint32
+ maxAttempts int // interval for reconnect attempts
+ recInterval time.Duration // maximum number of reconnect attempts
+
+ connChan chan ConnectionEvent // connection event channel
+ done chan struct{} // to terminate stats connection watcher
errorStatsData *adapter.StatDir
nodeStatsData *adapter.StatDir
ifaceStatsData *adapter.StatDir
sysStatsData *adapter.StatDir
bufStatsData *adapter.StatDir
+ memStatsData *adapter.StatDir
}
-func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
+func newStatsConnection(stats adapter.StatsAPI, attempts int, interval time.Duration) *StatsConnection {
+ if attempts == 0 {
+ attempts = DefaultMaxReconnectAttempts
+ }
+ if interval == 0 {
+ interval = DefaultReconnectInterval
+ }
+
return &StatsConnection{
statsClient: stats,
+ maxAttempts: attempts,
+ recInterval: interval,
+ connChan: make(chan ConnectionEvent, NotificationChanBufSize),
+ done: make(chan struct{}),
}
}
@@ -92,28 +113,50 @@ func newStatsConnection(stats adapter.StatsAPI) *StatsConnection {
// This call blocks until it is either connected, or an error occurs.
// Only one connection attempt will be performed.
func ConnectStats(stats adapter.StatsAPI) (*StatsConnection, error) {
- c := newStatsConnection(stats)
+ log.Debug("Connecting to stats..")
+ c := newStatsConnection(stats, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
- if err := c.connectClient(); err != nil {
+ if err := c.statsClient.Connect(); err != nil {
return nil, err
}
+ log.Debugf("Connected to stats.")
return c, nil
}
-func (c *StatsConnection) connectClient() error {
- log.Debug("Connecting to stats..")
+// AsyncConnectStats connects to the VPP stats socket asynchronously and returns the connection
+// handle with state channel. The call is non-blocking and the caller is expected to watch ConnectionEvent
+// values from the channel and wait for connect/disconnect events. Connection loop tries to reconnect the
+// socket in case the session was disconnected.
+func AsyncConnectStats(stats adapter.StatsAPI, attempts int, interval time.Duration) (*StatsConnection, chan ConnectionEvent, error) {
+ log.Debug("Connecting to stats asynchronously..")
+ c := newStatsConnection(stats, attempts, interval)
- if err := c.statsClient.Connect(); err != nil {
- return err
- }
+ go c.connectLoop()
- log.Debugf("Connected to stats.")
+ return c, c.connChan, nil
+}
- // store connected state
- atomic.StoreUint32(&c.connected, 1)
+func (c *StatsConnection) connectLoop() {
+ log.Debug("Asynchronously connecting to stats..")
+ var reconnectAttempts int
- return nil
+ // loop until connected
+ for {
+ if err := c.statsClient.Connect(); err == nil {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
+ break
+ } else if reconnectAttempts < c.maxAttempts {
+ reconnectAttempts++
+ log.Warnf("connecting stats failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ time.Sleep(c.recInterval)
+ } else {
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
+ return
+ }
+ }
+ // start monitoring stats connection state
+ go c.monitorSocket()
}
// Disconnect disconnects from Stats API and releases all connection-related resources.
@@ -122,14 +165,41 @@ func (c *StatsConnection) Disconnect() {
return
}
if c.statsClient != nil {
- c.disconnectClient()
+ if err := c.statsClient.Disconnect(); err != nil {
+ log.Debugf("disconnecting stats client failed: %v", err)
+ }
}
+ close(c.connChan)
+ close(c.done)
}
-func (c *StatsConnection) disconnectClient() {
- if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
- if err := c.statsClient.Disconnect(); err != nil {
- log.Debugf("disconnecting stats client failed: %v", err)
+func (c *StatsConnection) monitorSocket() {
+ var state, lastState ConnectionState
+ ticker := time.NewTicker(HealthCheckInterval)
+
+ for {
+ select {
+ case <-ticker.C:
+ _, err := c.statsClient.ListStats(SystemStats_Heartbeat)
+ state = Connected
+ if err == adapter.ErrStatsDataBusy {
+ state = NotResponding
+ }
+ if err == adapter.ErrStatsDisconnected {
+ state = Disconnected
+ }
+ if err == adapter.ErrStatsAccessFailed {
+ state = Failed
+ }
+ if state == lastState {
+ continue
+ }
+ lastState = state
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: state, Error: err})
+ case <-c.done:
+ log.Debugf("health check watcher closed")
+ c.sendStatsConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: nil})
+ break
}
}
}
@@ -198,6 +268,9 @@ func (c *StatsConnection) GetSystemStats(sysStats *api.SystemStats) (err error)
if ss, ok := stat.Data.(adapter.SimpleCounterStat); ok {
vals = make([]uint64, len(ss))
for w := range ss {
+ if ss[w] == nil {
+ continue
+ }
vals[w] = uint64(ss[w][0])
}
}
@@ -230,14 +303,23 @@ func (c *StatsConnection) GetErrorStats(errorStats *api.ErrorStats) (err error)
}
for i, stat := range c.errorStatsData.Entries {
- if stat.Type != adapter.ErrorIndex {
- continue
- }
if errStat, ok := stat.Data.(adapter.ErrorStat); ok {
- errorStats.Errors[i].Value = uint64(errStat)
+ values := make([]uint64, len(errStat))
+ for j, errStatW := range errStat {
+ values[j] = uint64(errStatW)
+ }
+ errorStats.Errors[i].Values = values
+ }
+ if errStat, ok := stat.Data.(adapter.SimpleCounterStat); ok {
+ values := make([]uint64, len(errStat))
+ for j, errStatW := range errStat {
+ for _, val := range errStatW {
+ values[j] += uint64(val)
+ }
+ }
+ errorStats.Errors[i].Values = values
}
}
-
return nil
}
@@ -468,3 +550,61 @@ func (c *StatsConnection) GetBufferStats(bufStats *api.BufferStats) (err error)
return nil
}
+
+func (c *StatsConnection) GetMemoryStats(memStats *api.MemoryStats) (err error) {
+ if err := c.updateStats(&c.memStatsData, MemoryStatSegPrefix, MemoryStatSegment, MemoryMainHeap); err != nil {
+ return err
+ }
+ convertStats := func(stats []adapter.Counter) api.MemoryCounters {
+ memUsg := make([]adapter.Counter, 7)
+ copy(memUsg, stats)
+ return api.MemoryCounters{
+ Total: uint64(memUsg[0]), Used: uint64(memUsg[1]), Free: uint64(memUsg[2]), UsedMMap: uint64(memUsg[3]),
+ TotalAlloc: uint64(memUsg[4]), FreeChunks: uint64(memUsg[5]), Releasable: uint64(memUsg[6]),
+ }
+ }
+
+ for _, stat := range c.memStatsData.Entries {
+ if strings.Contains(string(stat.Name), MemoryStatSegPrefix) {
+ _, f := path.Split(string(stat.Name))
+ var val float64
+ m, ok := stat.Data.(adapter.ScalarStat)
+ if ok {
+ val = float64(m)
+ }
+ switch f {
+ case MemoryStats_Total:
+ memStats.Total = val
+ case MemoryStats_Used:
+ memStats.Used = val
+ }
+ } else if string(stat.Name) == MemoryStatSegment {
+ if perHeapStats, ok := stat.Data.(adapter.SimpleCounterStat); ok {
+ if memStats.Stat == nil {
+ memStats.Stat = make(map[int]api.MemoryCounters)
+ }
+ for heap, stats := range perHeapStats {
+ memStats.Stat[heap] = convertStats(stats)
+ }
+ }
+ } else if string(stat.Name) == MemoryMainHeap {
+ if perHeapStats, ok := stat.Data.(adapter.SimpleCounterStat); ok {
+ if memStats.Main == nil {
+ memStats.Main = make(map[int]api.MemoryCounters)
+ }
+ for heap, stats := range perHeapStats {
+ memStats.Main[heap] = convertStats(stats)
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func (c *StatsConnection) sendStatsConnEvent(event ConnectionEvent) {
+ select {
+ case c.connChan <- event:
+ default:
+ log.Warn("Stats connection state channel is full, discarding value.")
+ }
+}