aboutsummaryrefslogtreecommitdiffstats
path: root/adapter
diff options
context:
space:
mode:
Diffstat (limited to 'adapter')
-rw-r--r--adapter/mock/mock_stats_adapter.go15
-rw-r--r--adapter/mock/mock_vpp_adapter.go41
-rw-r--r--adapter/socketclient/socketclient.go125
-rw-r--r--adapter/stats_api.go119
-rw-r--r--adapter/statsclient/stat_segment.go470
-rw-r--r--adapter/statsclient/stat_segment_api.go137
-rw-r--r--adapter/statsclient/statsclient.go598
-rw-r--r--adapter/statsclient/statseg.go104
-rw-r--r--adapter/statsclient/statseg_v1.go365
-rw-r--r--adapter/statsclient/statseg_v2.go411
-rw-r--r--adapter/vppapiclient/stat_client.go30
-rw-r--r--adapter/vppapiclient/stat_client_stub.go8
-rw-r--r--adapter/vppapiclient/vppapiclient.go21
-rw-r--r--adapter/vppapiclient/vppapiclient_stub.go2
14 files changed, 1537 insertions, 909 deletions
diff --git a/adapter/mock/mock_stats_adapter.go b/adapter/mock/mock_stats_adapter.go
index 55b1831..f2378f3 100644
--- a/adapter/mock/mock_stats_adapter.go
+++ b/adapter/mock/mock_stats_adapter.go
@@ -18,7 +18,7 @@
package mock
import (
- "git.fd.io/govpp.git/adapter"
+ "go.fd.io/govpp/adapter"
)
// implements StatsAPI
@@ -46,10 +46,13 @@ func (a *StatsAdapter) Disconnect() error {
}
// ListStats mocks name listing for all stats.
-func (a *StatsAdapter) ListStats(patterns ...string) ([]string, error) {
- var statNames []string
+func (a *StatsAdapter) ListStats(patterns ...string) ([]adapter.StatIdentifier, error) {
+ var statNames []adapter.StatIdentifier
for _, stat := range a.entries {
- statNames = append(statNames, string(stat.Name))
+ statNames = append(statNames, adapter.StatIdentifier{
+ Name: stat.Name,
+ Index: stat.Index,
+ })
}
return statNames, nil
}
@@ -63,6 +66,10 @@ func (a *StatsAdapter) PrepareDir(prefixes ...string) (*adapter.StatDir, error)
return a.dir, nil
}
+func (a *StatsAdapter) PrepareDirOnIndex(indexes ...uint32) (*adapter.StatDir, error) {
+ return a.dir, nil
+}
+
func (a *StatsAdapter) UpdateDir(dir *adapter.StatDir) error {
*dir = *a.dir
return nil
diff --git a/adapter/mock/mock_vpp_adapter.go b/adapter/mock/mock_vpp_adapter.go
index b7fa002..cb37dd2 100644
--- a/adapter/mock/mock_vpp_adapter.go
+++ b/adapter/mock/mock_vpp_adapter.go
@@ -22,10 +22,10 @@ import (
"reflect"
"sync"
- "git.fd.io/govpp.git/adapter"
- "git.fd.io/govpp.git/adapter/mock/binapi"
- "git.fd.io/govpp.git/api"
- "git.fd.io/govpp.git/codec"
+ "go.fd.io/govpp/adapter"
+ "go.fd.io/govpp/adapter/mock/binapi"
+ "go.fd.io/govpp/api"
+ "go.fd.io/govpp/codec"
)
type replyMode int
@@ -44,7 +44,7 @@ type VppAdapter struct {
access sync.RWMutex
msgNameToIds map[string]uint16
msgIDsToName map[uint16]string
- binAPITypes map[string]reflect.Type
+ binAPITypes map[string]map[string]reflect.Type
repliesLock sync.Mutex // mutex for the queue
replies []reply // FIFO queue of messages
@@ -126,7 +126,7 @@ func NewVppAdapter() *VppAdapter {
msgIDSeq: 1000,
msgIDsToName: make(map[uint16]string),
msgNameToIds: make(map[string]uint16),
- binAPITypes: make(map[string]reflect.Type),
+ binAPITypes: make(map[string]map[string]reflect.Type),
}
a.registerBinAPITypes()
return a
@@ -165,19 +165,25 @@ func (a *VppAdapter) GetMsgNameByID(msgID uint16) (string, bool) {
func (a *VppAdapter) registerBinAPITypes() {
a.access.Lock()
defer a.access.Unlock()
- for _, msg := range api.GetRegisteredMessages() {
- a.binAPITypes[msg.GetMessageName()] = reflect.TypeOf(msg).Elem()
+ for pkg, msgs := range api.GetRegisteredMessages() {
+ msgMap := make(map[string]reflect.Type)
+ for _, msg := range msgs {
+ msgMap[msg.GetMessageName()] = reflect.TypeOf(msg).Elem()
+ }
+ a.binAPITypes[pkg] = msgMap
}
}
// ReplyTypeFor returns reply message type for given request message name.
-func (a *VppAdapter) ReplyTypeFor(requestMsgName string) (reflect.Type, uint16, bool) {
+func (a *VppAdapter) ReplyTypeFor(pkg, requestMsgName string) (reflect.Type, uint16, bool) {
replyName, foundName := binapi.ReplyNameFor(requestMsgName)
if foundName {
- if reply, found := a.binAPITypes[replyName]; found {
- msgID, err := a.GetMsgID(replyName, "")
- if err == nil {
- return reply, msgID, found
+ if messages, found := a.binAPITypes[pkg]; found {
+ if reply, found := messages[replyName]; found {
+ msgID, err := a.GetMsgID(replyName, "")
+ if err == nil {
+ return reply, msgID, found
+ }
}
}
}
@@ -186,8 +192,8 @@ func (a *VppAdapter) ReplyTypeFor(requestMsgName string) (reflect.Type, uint16,
}
// ReplyFor returns reply message for given request message name.
-func (a *VppAdapter) ReplyFor(requestMsgName string) (api.Message, uint16, bool) {
- replType, msgID, foundReplType := a.ReplyTypeFor(requestMsgName)
+func (a *VppAdapter) ReplyFor(pkg, requestMsgName string) (api.Message, uint16, bool) {
+ replType, msgID, foundReplType := a.ReplyTypeFor(pkg, requestMsgName)
if foundReplType {
msgVal := reflect.New(replType)
if msg, ok := msgVal.Interface().(api.Message); ok {
@@ -252,7 +258,10 @@ func (a *VppAdapter) GetMsgID(msgName string, msgCrc string) (uint16, error) {
// SendMsg emulates sending a binary-encoded message to VPP.
func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
- switch a.mode {
+ a.repliesLock.Lock()
+ mode := a.mode
+ a.repliesLock.Unlock()
+ switch mode {
case useReplyHandlers:
for i := len(a.replyHandlers) - 1; i >= 0; i-- {
replyHandler := a.replyHandlers[i]
diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go
index 637bd69..c9aa2b4 100644
--- a/adapter/socketclient/socketclient.go
+++ b/adapter/socketclient/socketclient.go
@@ -30,9 +30,9 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
- "git.fd.io/govpp.git/adapter"
- "git.fd.io/govpp.git/binapi/memclnt"
- "git.fd.io/govpp.git/codec"
+ "go.fd.io/govpp/adapter"
+ "go.fd.io/govpp/binapi/memclnt"
+ "go.fd.io/govpp/codec"
)
const (
@@ -73,29 +73,8 @@ func init() {
log = logger.WithField("logger", "govpp/socketclient")
}
-const socketMissing = `
-------------------------------------------------------------
- No socket file found at: %s
- VPP binary API socket file is missing!
-
- - is VPP running with socket for binapi enabled?
- - is the correct socket name configured?
-
- To enable it add following section to your VPP config:
- socksvr {
- default
- }
-------------------------------------------------------------
-`
-
-var warnOnce sync.Once
-
-func (c *socketClient) printMissingSocketMsg() {
- fmt.Fprintf(os.Stderr, socketMissing, c.sockAddr)
-}
-
-type socketClient struct {
- sockAddr string
+type Client struct {
+ socketPath string
clientName string
conn *net.UnixConn
@@ -117,12 +96,14 @@ type socketClient struct {
wg sync.WaitGroup
}
-func NewVppClient(sockAddr string) *socketClient {
- if sockAddr == "" {
- sockAddr = DefaultSocketName
+// NewVppClient returns a new Client using socket.
+// If socket is empty string DefaultSocketName is used.
+func NewVppClient(socket string) *Client {
+ if socket == "" {
+ socket = DefaultSocketName
}
- return &socketClient{
- sockAddr: sockAddr,
+ return &Client{
+ socketPath: socket,
clientName: DefaultClientName,
connectTimeout: DefaultConnectTimeout,
disconnectTimeout: DefaultDisconnectTimeout,
@@ -136,61 +117,34 @@ func NewVppClient(sockAddr string) *socketClient {
}
// SetClientName sets a client name used for identification.
-func (c *socketClient) SetClientName(name string) {
+func (c *Client) SetClientName(name string) {
c.clientName = name
}
// SetConnectTimeout sets timeout used during connecting.
-func (c *socketClient) SetConnectTimeout(t time.Duration) {
+func (c *Client) SetConnectTimeout(t time.Duration) {
c.connectTimeout = t
}
// SetDisconnectTimeout sets timeout used during disconnecting.
-func (c *socketClient) SetDisconnectTimeout(t time.Duration) {
+func (c *Client) SetDisconnectTimeout(t time.Duration) {
c.disconnectTimeout = t
}
-func (c *socketClient) SetMsgCallback(cb adapter.MsgCallback) {
+func (c *Client) SetMsgCallback(cb adapter.MsgCallback) {
log.Debug("SetMsgCallback")
c.msgCallback = cb
}
-const legacySocketName = "/run/vpp-api.sock"
-
-func (c *socketClient) checkLegacySocket() bool {
- if c.sockAddr == legacySocketName {
- return false
- }
- log.Debugf("checking legacy socket: %s", legacySocketName)
- // check if socket exists
- if _, err := os.Stat(c.sockAddr); err == nil {
- return false // socket exists
- } else if !os.IsNotExist(err) {
- return false // some other error occurred
- }
- // check if legacy socket exists
- if _, err := os.Stat(legacySocketName); err == nil {
- // legacy socket exists, update sockAddr
- c.sockAddr = legacySocketName
- return true
- }
- // no socket socket found
- return false
-}
-
// WaitReady checks socket file existence and waits for it if necessary
-func (c *socketClient) WaitReady() error {
+func (c *Client) WaitReady() error {
// check if socket already exists
- if _, err := os.Stat(c.sockAddr); err == nil {
+ if _, err := os.Stat(c.socketPath); err == nil {
return nil // socket exists, we are ready
} else if !os.IsNotExist(err) {
return err // some other error occurred
}
- if c.checkLegacySocket() {
- return nil
- }
-
// socket does not exist, watch for it
watcher, err := fsnotify.NewWatcher()
if err != nil {
@@ -203,7 +157,7 @@ func (c *socketClient) WaitReady() error {
}()
// start directory watcher
- if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
+ if err := watcher.Add(filepath.Dir(c.socketPath)); err != nil {
return err
}
@@ -211,17 +165,14 @@ func (c *socketClient) WaitReady() error {
for {
select {
case <-timeout.C:
- if c.checkLegacySocket() {
- return nil
- }
- return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
+ return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.socketPath)
case e := <-watcher.Errors:
return e
case ev := <-watcher.Events:
log.Debugf("watcher event: %+v", ev)
- if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
+ if ev.Name == c.socketPath && (ev.Op&fsnotify.Create) == fsnotify.Create {
// socket created, we are ready
return nil
}
@@ -229,18 +180,15 @@ func (c *socketClient) WaitReady() error {
}
}
-func (c *socketClient) Connect() error {
- c.checkLegacySocket()
-
+func (c *Client) Connect() error {
// check if socket exists
- if _, err := os.Stat(c.sockAddr); os.IsNotExist(err) {
- warnOnce.Do(c.printMissingSocketMsg)
- return fmt.Errorf("VPP API socket file %s does not exist", c.sockAddr)
+ if _, err := os.Stat(c.socketPath); os.IsNotExist(err) {
+ return fmt.Errorf("VPP API socket file %s does not exist", c.socketPath)
} else if err != nil {
return fmt.Errorf("VPP API socket error: %v", err)
}
- if err := c.connect(c.sockAddr); err != nil {
+ if err := c.connect(c.socketPath); err != nil {
return err
}
@@ -256,7 +204,7 @@ func (c *socketClient) Connect() error {
return nil
}
-func (c *socketClient) Disconnect() error {
+func (c *Client) Disconnect() error {
if c.conn == nil {
return nil
}
@@ -273,7 +221,6 @@ func (c *socketClient) Disconnect() error {
// Don't bother sending a vl_api_sockclnt_delete_t message,
// just close the socket.
-
if err := c.disconnect(); err != nil {
return err
}
@@ -283,10 +230,10 @@ func (c *socketClient) Disconnect() error {
const defaultBufferSize = 4096
-func (c *socketClient) connect(sockAddr string) error {
+func (c *Client) connect(sockAddr string) error {
addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
- log.Debugf("Connecting to: %v", c.sockAddr)
+ log.Debugf("Connecting to: %v", c.socketPath)
conn, err := net.DialUnix("unix", nil, addr)
if err != nil {
@@ -311,7 +258,7 @@ func (c *socketClient) connect(sockAddr string) error {
return nil
}
-func (c *socketClient) disconnect() error {
+func (c *Client) disconnect() error {
log.Debugf("Closing socket")
if err := c.conn.Close(); err != nil {
log.Debugln("Closing socket failed:", err)
@@ -326,7 +273,7 @@ const (
deleteMsgContext = byte(124)
)
-func (c *socketClient) open() error {
+func (c *Client) open() error {
var msgCodec = codec.DefaultCodec
// Request socket client create
@@ -379,7 +326,7 @@ func (c *socketClient) open() error {
return nil
}
-func (c *socketClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
+func (c *Client) GetMsgID(msgName string, msgCrc string) (uint16, error) {
if msgID, ok := c.msgTable[msgName+"_"+msgCrc]; ok {
return msgID, nil
}
@@ -389,7 +336,7 @@ func (c *socketClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
}
}
-func (c *socketClient) SendMsg(context uint32, data []byte) error {
+func (c *Client) SendMsg(context uint32, data []byte) error {
if len(data) < 10 {
return fmt.Errorf("invalid message data, length must be at least 10 bytes")
}
@@ -423,7 +370,7 @@ func setMsgRequestHeader(data []byte, clientIndex, context uint32) {
binary.BigEndian.PutUint32(data[6:10], context)
}
-func (c *socketClient) writeMsg(msg []byte) error {
+func (c *Client) writeMsg(msg []byte) error {
// we lock to prevent mixing multiple message writes
c.writeMu.Lock()
defer c.writeMu.Unlock()
@@ -482,7 +429,7 @@ func writeMsgData(w io.Writer, msg []byte, writerSize int) error {
return nil
}
-func (c *socketClient) readerLoop() {
+func (c *Client) readerLoop() {
defer c.wg.Done()
defer log.Debugf("reader loop done")
@@ -528,7 +475,7 @@ func getMsgReplyHeader(msg []byte) (msgID uint16, context uint32) {
return
}
-func (c *socketClient) readMsgTimeout(buf []byte, timeout time.Duration) ([]byte, error) {
+func (c *Client) readMsgTimeout(buf []byte, timeout time.Duration) ([]byte, error) {
// set read deadline
readDeadline := time.Now().Add(timeout)
if err := c.conn.SetReadDeadline(readDeadline); err != nil {
@@ -549,7 +496,7 @@ func (c *socketClient) readMsgTimeout(buf []byte, timeout time.Duration) ([]byte
return msgReply, nil
}
-func (c *socketClient) readMsg(buf []byte) ([]byte, error) {
+func (c *Client) readMsg(buf []byte) ([]byte, error) {
log.Debug("reading msg..")
header := c.headerPool.Get().([]byte)
diff --git a/adapter/stats_api.go b/adapter/stats_api.go
index 90dbeb3..7dc7dc3 100644
--- a/adapter/stats_api.go
+++ b/adapter/stats_api.go
@@ -16,7 +16,6 @@ package adapter
import (
"errors"
- "fmt"
)
const (
@@ -27,6 +26,7 @@ const (
var (
ErrStatsDataBusy = errors.New("stats data busy")
ErrStatsDirStale = errors.New("stats dir stale")
+ ErrStatsDisconnected = errors.New("stats disconnected")
ErrStatsAccessFailed = errors.New("stats access failed")
)
@@ -37,59 +37,53 @@ type StatsAPI interface {
// Disconnect terminates client connection.
Disconnect() error
- // ListStats lists names for stats matching patterns.
- ListStats(patterns ...string) (names []string, err error)
+ // ListStats lists indexed names for stats matching patterns.
+ ListStats(patterns ...string) (indexes []StatIdentifier, err error)
// DumpStats dumps all stat entries.
DumpStats(patterns ...string) (entries []StatEntry, err error)
// PrepareDir prepares new stat dir for entries that match any of prefixes.
PrepareDir(patterns ...string) (*StatDir, error)
+ // PrepareDirOnIndex prepares new stat dir for entries that match any of indexes.
+ PrepareDirOnIndex(indexes ...uint32) (*StatDir, error)
// UpdateDir updates stat dir and all of their entries.
UpdateDir(dir *StatDir) error
}
// StatType represents type of stat directory and simply
// defines what type of stat data is stored in the stat entry.
-type StatType int
+type StatType string
const (
- _ StatType = 0
- ScalarIndex StatType = 1
- SimpleCounterVector StatType = 2
- CombinedCounterVector StatType = 3
- ErrorIndex StatType = 4
- NameVector StatType = 5
+ Unknown StatType = "UnknownStatType"
+ ScalarIndex StatType = "ScalarIndex"
+ SimpleCounterVector StatType = "SimpleCounterVector"
+ CombinedCounterVector StatType = "CombinedCounterVector"
+ ErrorIndex StatType = "ErrorIndex"
+ NameVector StatType = "NameVector"
+ Empty StatType = "Empty"
+ Symlink StatType = "Symlink"
)
-func (d StatType) String() string {
- switch d {
- case ScalarIndex:
- return "ScalarIndex"
- case SimpleCounterVector:
- return "SimpleCounterVector"
- case CombinedCounterVector:
- return "CombinedCounterVector"
- case ErrorIndex:
- return "ErrorIndex"
- case NameVector:
- return "NameVector"
- }
- return fmt.Sprintf("UnknownStatType(%d)", d)
-}
-
// StatDir defines directory of stats entries created by PrepareDir.
type StatDir struct {
Epoch int64
- Indexes []uint32
Entries []StatEntry
}
+// StatIdentifier holds a stat entry name and index
+type StatIdentifier struct {
+ Index uint32
+ Name []byte
+}
+
// StatEntry represents single stat entry. The type of stat stored in Data
// is defined by Type.
type StatEntry struct {
- Name []byte
- Type StatType
- Data Stat
+ StatIdentifier
+ Type StatType
+ Data Stat
+ Symlink bool
}
// Counter represents simple counter with single value, which is usually packet count.
@@ -99,11 +93,11 @@ type Counter uint64
type CombinedCounter [2]uint64
func (s CombinedCounter) Packets() uint64 {
- return uint64(s[0])
+ return s[0]
}
func (s CombinedCounter) Bytes() uint64 {
- return uint64(s[1])
+ return s[1]
}
// Name represents string value stored under name vector.
@@ -118,6 +112,9 @@ type Stat interface {
// IsZero returns true if all of its values equal to zero.
IsZero() bool
+ // Type returns underlying type of a stat
+ Type() StatType
+
// isStat is intentionally unexported to limit implementations of interface to this package,
isStat()
}
@@ -125,16 +122,16 @@ type Stat interface {
// ScalarStat represents stat for ScalarIndex.
type ScalarStat float64
-// ErrorStat represents stat for ErrorIndex.
-type ErrorStat Counter
+// ErrorStat represents stat for ErrorIndex. The array represents workers.
+type ErrorStat []Counter
-// SimpleCounterStat represents stat for SimpleCounterVector.
+// SimpleCounterStat represents indexed stat for SimpleCounterVector.
// The outer array represents workers and the inner array represents interface/node/.. indexes.
// Values should be aggregated per interface/node for every worker.
// ReduceSimpleCounterStatIndex can be used to reduce specific index.
type SimpleCounterStat [][]Counter
-// CombinedCounterStat represents stat for CombinedCounterVector.
+// CombinedCounterStat represents indexed stat for CombinedCounterVector.
// The outer array represents workers and the inner array represents interface/node/.. indexes.
// Values should be aggregated per interface/node for every worker.
// ReduceCombinedCounterStatIndex can be used to reduce specific index.
@@ -143,18 +140,40 @@ type CombinedCounterStat [][]CombinedCounter
// NameStat represents stat for NameVector.
type NameStat []Name
+// EmptyStat represents removed counter directory
+type EmptyStat string
+
func (ScalarStat) isStat() {}
func (ErrorStat) isStat() {}
func (SimpleCounterStat) isStat() {}
func (CombinedCounterStat) isStat() {}
func (NameStat) isStat() {}
+func (EmptyStat) isStat() {}
func (s ScalarStat) IsZero() bool {
return s == 0
}
+
+func (s ScalarStat) Type() StatType {
+ return ScalarIndex
+}
+
func (s ErrorStat) IsZero() bool {
- return s == 0
+ if s == nil {
+ return true
+ }
+ for _, ss := range s {
+ if ss != 0 {
+ return false
+ }
+ }
+ return true
+}
+
+func (s ErrorStat) Type() StatType {
+ return ErrorIndex
}
+
func (s SimpleCounterStat) IsZero() bool {
if s == nil {
return true
@@ -168,6 +187,11 @@ func (s SimpleCounterStat) IsZero() bool {
}
return true
}
+
+func (s SimpleCounterStat) Type() StatType {
+ return SimpleCounterVector
+}
+
func (s CombinedCounterStat) IsZero() bool {
if s == nil {
return true
@@ -184,6 +208,11 @@ func (s CombinedCounterStat) IsZero() bool {
}
return true
}
+
+func (s CombinedCounterStat) Type() StatType {
+ return CombinedCounterVector
+}
+
func (s NameStat) IsZero() bool {
if s == nil {
return true
@@ -196,6 +225,18 @@ func (s NameStat) IsZero() bool {
return true
}
+func (s NameStat) Type() StatType {
+ return NameVector
+}
+
+func (s EmptyStat) IsZero() bool {
+ return true
+}
+
+func (s EmptyStat) Type() StatType {
+ return Empty
+}
+
// ReduceSimpleCounterStatIndex returns reduced SimpleCounterStat s for index i.
func ReduceSimpleCounterStatIndex(s SimpleCounterStat, i int) uint64 {
var val uint64
@@ -209,8 +250,8 @@ func ReduceSimpleCounterStatIndex(s SimpleCounterStat, i int) uint64 {
func ReduceCombinedCounterStatIndex(s CombinedCounterStat, i int) [2]uint64 {
var val [2]uint64
for _, w := range s {
- val[0] += uint64(w[i][0])
- val[1] += uint64(w[i][1])
+ val[0] += w[i][0]
+ val[1] += w[i][1]
}
return val
}
diff --git a/adapter/statsclient/stat_segment.go b/adapter/statsclient/stat_segment.go
deleted file mode 100644
index 2a97fd4..0000000
--- a/adapter/statsclient/stat_segment.go
+++ /dev/null
@@ -1,470 +0,0 @@
-// Copyright (c) 2019 Cisco and/or its affiliates.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at:
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package statsclient
-
-import (
- "fmt"
- "net"
- "syscall"
- "unsafe"
-
- "github.com/ftrvxmtrx/fd"
-
- "git.fd.io/govpp.git/adapter"
-)
-
-var (
- ErrStatDataLenIncorrect = fmt.Errorf("stat data length incorrect")
-)
-
-const (
- minVersion = 0
- maxVersion = 1
-)
-
-func checkVersion(ver uint64) error {
- if ver < minVersion {
- return fmt.Errorf("stat segment version is too old: %v (minimal version: %v)", ver, minVersion)
- } else if ver > maxVersion {
- return fmt.Errorf("stat segment version is not supported: %v (minimal version: %v)", ver, maxVersion)
- }
- return nil
-}
-
-type statSegment struct {
- sharedHeader []byte
- memorySize int64
-
- // legacyVersion represents stat segment version 0
- // and is used as fallback for VPP 19.04
- legacyVersion bool
-}
-
-func (c *statSegment) getHeader() (header sharedHeader) {
- if c.legacyVersion {
- return loadSharedHeaderLegacy(c.sharedHeader)
- }
- return loadSharedHeader(c.sharedHeader)
-}
-
-func (c *statSegment) getEpoch() (int64, bool) {
- h := c.getHeader()
- return h.epoch, h.inProgress != 0
-}
-
-func (c *statSegment) getOffsets() (dir, err, stat int64) {
- h := c.getHeader()
- return h.directoryOffset, h.errorOffset, h.statsOffset
-}
-
-func (c *statSegment) connect(sockName string) error {
- if c.sharedHeader != nil {
- return fmt.Errorf("already connected")
- }
-
- addr := net.UnixAddr{
- Net: "unixpacket",
- Name: sockName,
- }
- Log.Debugf("connecting to: %v", addr)
-
- conn, err := net.DialUnix(addr.Net, nil, &addr)
- if err != nil {
- Log.Warnf("connecting to socket %s failed: %s", addr, err)
- return err
- }
- defer func() {
- if err := conn.Close(); err != nil {
- Log.Warnf("closing socket failed: %v", err)
- }
- }()
-
- Log.Debugf("connected to socket")
-
- files, err := fd.Get(conn, 1, nil)
- if err != nil {
- return fmt.Errorf("getting file descriptor over socket failed: %v", err)
- }
- if len(files) == 0 {
- return fmt.Errorf("no files received over socket")
- }
-
- file := files[0]
- defer func() {
- if err := file.Close(); err != nil {
- Log.Warnf("closing file failed: %v", err)
- }
- }()
-
- info, err := file.Stat()
- if err != nil {
- return err
- }
- size := info.Size()
-
- data, err := syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
- if err != nil {
- Log.Debugf("mapping shared memory failed: %v", err)
- return fmt.Errorf("mapping shared memory failed: %v", err)
- }
-
- Log.Debugf("successfuly mmapped shared memory segment (size: %v) %v", size, len(data))
-
- c.sharedHeader = data
- c.memorySize = size
-
- hdr := loadSharedHeader(c.sharedHeader)
- Log.Debugf("stat segment header: %+v", hdr)
-
- if hdr.legacyVersion() {
- c.legacyVersion = true
- hdr = loadSharedHeaderLegacy(c.sharedHeader)
- Log.Debugf("falling back to legacy version (VPP <=19.04) of stat segment (header: %+v)", hdr)
- }
-
- if err := checkVersion(hdr.version); err != nil {
- return err
- }
-
- return nil
-}
-
-func (c *statSegment) disconnect() error {
- if c.sharedHeader == nil {
- return nil
- }
-
- if err := syscall.Munmap(c.sharedHeader); err != nil {
- Log.Debugf("unmapping shared memory failed: %v", err)
- return fmt.Errorf("unmapping shared memory failed: %v", err)
- }
- c.sharedHeader = nil
-
- Log.Debugf("successfuly unmapped shared memory")
- return nil
-}
-
-type statDirectoryType int32
-
-const (
- statDirIllegal = 0
- statDirScalarIndex = 1
- statDirCounterVectorSimple = 2
- statDirCounterVectorCombined = 3
- statDirErrorIndex = 4
- statDirNameVector = 5
- statDirEmpty = 6
-)
-
-func (t statDirectoryType) String() string {
- return adapter.StatType(t).String()
-}
-
-type statSegDirectoryEntry struct {
- directoryType statDirectoryType
- // unionData can represent:
- // - offset
- // - index
- // - value
- unionData uint64
- offsetVector uint64
- name [128]byte
-}
-
-func (c *statSegment) getStatDirVector() unsafe.Pointer {
- dirOffset, _, _ := c.getOffsets()
- return unsafe.Pointer(&c.sharedHeader[dirOffset])
-}
-
-func (c *statSegment) getStatDirIndex(p unsafe.Pointer, index uint32) *statSegDirectoryEntry {
- return (*statSegDirectoryEntry)(unsafe.Pointer(uintptr(p) + uintptr(index)*unsafe.Sizeof(statSegDirectoryEntry{})))
-}
-
-func (c *statSegment) copyEntryData(dirEntry *statSegDirectoryEntry) adapter.Stat {
- dirType := adapter.StatType(dirEntry.directoryType)
-
- switch dirType {
- case statDirScalarIndex:
- return adapter.ScalarStat(dirEntry.unionData)
-
- case statDirErrorIndex:
- if dirEntry.unionData == 0 {
- debugf("offset invalid for %s", dirEntry.name)
- break
- } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
- debugf("offset out of range for %s", dirEntry.name)
- break
- }
-
- _, errOffset, _ := c.getOffsets()
- offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset])
-
- var errData adapter.Counter
- if c.legacyVersion {
- // error were not vector (per-worker) in VPP 19.04
- offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0))
- val := *(*adapter.Counter)(statSegPointer(offsetVector, offset))
- errData = val
- } else {
- vecLen := uint32(vectorLen(offsetVector))
-
- for i := uint32(0); i < vecLen; i++ {
- cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
- offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0))
- debugf("error index, cb: %d, offset: %d", cb, offset)
- val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset))
- errData += val
- }
- }
- return adapter.ErrorStat(errData)
-
- case statDirCounterVectorSimple:
- if dirEntry.unionData == 0 {
- debugf("offset invalid for %s", dirEntry.name)
- break
- } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
- debugf("offset out of range for %s", dirEntry.name)
- break
- }
-
- vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])))
- offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
-
- data := make([][]adapter.Counter, vecLen)
- for i := uint32(0); i < vecLen; i++ {
- cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
- counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)])
- vecLen2 := uint32(vectorLen(counterVec))
- data[i] = make([]adapter.Counter, vecLen2)
- for j := uint32(0); j < vecLen2; j++ {
- offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0))
- val := *(*adapter.Counter)(statSegPointer(counterVec, offset))
- data[i][j] = val
- }
- }
- return adapter.SimpleCounterStat(data)
-
- case statDirCounterVectorCombined:
- if dirEntry.unionData == 0 {
- debugf("offset invalid for %s", dirEntry.name)
- break
- } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
- debugf("offset out of range for %s", dirEntry.name)
- break
- }
-
- vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])))
- offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
-
- data := make([][]adapter.CombinedCounter, vecLen)
- for i := uint32(0); i < vecLen; i++ {
- cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
- counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)])
- vecLen2 := uint32(vectorLen(counterVec))
- data[i] = make([]adapter.CombinedCounter, vecLen2)
- for j := uint32(0); j < vecLen2; j++ {
- offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{})
- val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset))
- data[i][j] = val
- }
- }
- return adapter.CombinedCounterStat(data)
-
- case statDirNameVector:
- if dirEntry.unionData == 0 {
- debugf("offset invalid for %s", dirEntry.name)
- break
- } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
- debugf("offset out of range for %s", dirEntry.name)
- break
- }
-
- vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])))
- offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
-
- data := make([]adapter.Name, vecLen)
- for i := uint32(0); i < vecLen; i++ {
- cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
- if cb == 0 {
- debugf("name vector out of range for %s (%v)", dirEntry.name, i)
- continue
- }
- nameVec := unsafe.Pointer(&c.sharedHeader[cb])
- vecLen2 := uint32(vectorLen(nameVec))
-
- nameStr := make([]byte, 0, vecLen2)
- for j := uint32(0); j < vecLen2; j++ {
- offset := uintptr(j) * unsafe.Sizeof(byte(0))
- val := *(*byte)(statSegPointer(nameVec, offset))
- if val > 0 {
- nameStr = append(nameStr, val)
- }
- }
- data[i] = adapter.Name(nameStr)
- }
- return adapter.NameStat(data)
-
- case statDirEmpty:
- // no-op
-
- default:
- // TODO: monitor occurrences with metrics
- debugf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name)
- }
- return nil
-}
-
-func (c *statSegment) updateEntryData(dirEntry *statSegDirectoryEntry, stat *adapter.Stat) error {
- switch (*stat).(type) {
- case adapter.ScalarStat:
- *stat = adapter.ScalarStat(dirEntry.unionData)
-
- case adapter.ErrorStat:
- if dirEntry.unionData == 0 {
- debugf("offset invalid for %s", dirEntry.name)
- break
- } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
- debugf("offset out of range for %s", dirEntry.name)
- break
- }
-
- _, errOffset, _ := c.getOffsets()
- offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset])
-
- var errData adapter.Counter
- if c.legacyVersion {
- // error were not vector (per-worker) in VPP 19.04
- offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0))
- val := *(*adapter.Counter)(statSegPointer(offsetVector, offset))
- errData = val
- } else {
- vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[errOffset])))
-
- for i := uint32(0); i < vecLen; i++ {
- cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
- offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0))
- val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset))
- errData += val
- }
- }
- *stat = adapter.ErrorStat(errData)
-
- case adapter.SimpleCounterStat:
- if dirEntry.unionData == 0 {
- debugf("offset invalid for %s", dirEntry.name)
- break
- } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
- debugf("offset out of range for %s", dirEntry.name)
- break
- }
-
- vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])))
- offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
-
- data := (*stat).(adapter.SimpleCounterStat)
- if uint32(len(data)) != vecLen {
- return ErrStatDataLenIncorrect
- }
- for i := uint32(0); i < vecLen; i++ {
- cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
- counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)])
- vecLen2 := uint32(vectorLen(counterVec))
- simpData := data[i]
- if uint32(len(simpData)) != vecLen2 {
- return ErrStatDataLenIncorrect
- }
- for j := uint32(0); j < vecLen2; j++ {
- offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0))
- val := *(*adapter.Counter)(statSegPointer(counterVec, offset))
- simpData[j] = val
- }
- }
-
- case adapter.CombinedCounterStat:
- if dirEntry.unionData == 0 {
- debugf("offset invalid for %s", dirEntry.name)
- break
- } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
- debugf("offset out of range for %s", dirEntry.name)
- break
- }
-
- vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])))
- offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
-
- data := (*stat).(adapter.CombinedCounterStat)
- if uint32(len(data)) != vecLen {
- return ErrStatDataLenIncorrect
- }
- for i := uint32(0); i < vecLen; i++ {
- cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
- counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)])
- vecLen2 := uint32(vectorLen(counterVec))
- combData := data[i]
- if uint32(len(combData)) != vecLen2 {
- return ErrStatDataLenIncorrect
- }
- for j := uint32(0); j < vecLen2; j++ {
- offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{})
- val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset))
- combData[j] = val
- }
- }
-
- case adapter.NameStat:
- if dirEntry.unionData == 0 {
- debugf("offset invalid for %s", dirEntry.name)
- break
- } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) {
- debugf("offset out of range for %s", dirEntry.name)
- break
- }
-
- vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])))
- offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector))
-
- data := (*stat).(adapter.NameStat)
- if uint32(len(data)) != vecLen {
- return ErrStatDataLenIncorrect
- }
- for i := uint32(0); i < vecLen; i++ {
- cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
- if cb == 0 {
- continue
- }
- nameVec := unsafe.Pointer(&c.sharedHeader[cb])
- vecLen2 := uint32(vectorLen(nameVec))
-
- nameData := data[i]
- if uint32(len(nameData))+1 != vecLen2 {
- return ErrStatDataLenIncorrect
- }
- for j := uint32(0); j < vecLen2; j++ {
- offset := uintptr(j) * unsafe.Sizeof(byte(0))
- val := *(*byte)(statSegPointer(nameVec, offset))
- if val == 0 {
- break
- }
- nameData[j] = val
- }
- }
-
- default:
- if Debug {
- Log.Debugf("Unrecognized stat type %T for stat entry: %v", stat, dirEntry.name)
- }
- }
- return nil
-}
diff --git a/adapter/statsclient/stat_segment_api.go b/adapter/statsclient/stat_segment_api.go
new file mode 100644
index 0000000..2c2950f
--- /dev/null
+++ b/adapter/statsclient/stat_segment_api.go
@@ -0,0 +1,137 @@
+// Copyright (c) 2020 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statsclient
+
+import (
+ "fmt"
+ "sync/atomic"
+ "time"
+ "unsafe"
+
+ "go.fd.io/govpp/adapter"
+)
+
+var (
+ // ErrStatDataLenIncorrect is returned when stat data does not match vector
+ // length of a respective data directory
+ ErrStatDataLenIncorrect = fmt.Errorf("stat data length incorrect")
+)
+
+var (
+ MaxWaitInProgress = time.Millisecond * 100
+ CheckDelayInProgress = time.Microsecond * 10
+)
+
+const (
+ minVersion = 1
+ maxVersion = 2
+)
+
+var dirTypeMapping = map[dirType]adapter.StatType{
+ 1: adapter.ScalarIndex,
+ 2: adapter.SimpleCounterVector,
+ 3: adapter.CombinedCounterVector,
+ 4: adapter.NameVector,
+ 5: adapter.Empty,
+ 6: adapter.Symlink,
+}
+
+var dirTypeMappingLegacy = map[dirType]adapter.StatType{
+ 1: adapter.ScalarIndex,
+ 2: adapter.SimpleCounterVector,
+ 3: adapter.CombinedCounterVector,
+ 4: adapter.ErrorIndex,
+ 5: adapter.NameVector,
+ 6: adapter.Empty,
+ 7: adapter.Symlink,
+}
+
+type (
+ dirVector unsafe.Pointer
+ dirSegment unsafe.Pointer
+ dirName []byte
+ dirType int32
+)
+
+// statSegment represents common API for every stats API version
+type statSegment interface {
+ // GetDirectoryVector returns pointer to memory where the beginning
+ // of the data directory is located.
+ GetDirectoryVector() dirVector
+
+ // GetStatDirOnIndex accepts directory vector and particular index.
+ // Returns pointer to the beginning of the segment. Also the directory
+ // name as [128]byte and the directory type is returned for easy use
+ // without needing to know the exact segment version.
+ //
+ // Note that if the index is equal to 0, the result pointer points to
+ // the same memory address as the argument.
+ GetStatDirOnIndex(v dirVector, index uint32) (dirSegment, dirName, adapter.StatType)
+
+ // GetEpoch re-loads stats header and returns current epoch
+ //and 'inProgress' value
+ GetEpoch() (int64, bool)
+
+ // CopyEntryData accepts pointer to a directory segment and returns adapter.Stat
+ // based on directory type populated with data. The index is an optional parameter
+ // (used by symlinks) returning stats for item on the given index only.
+ // Use ^uint32(0) as an empty index (since 0 is a valid value).
+ CopyEntryData(segment dirSegment, index uint32) adapter.Stat
+
+ // UpdateEntryData accepts pointer to a directory segment with data, and stat
+ // segment to update
+ UpdateEntryData(segment dirSegment, s *adapter.Stat) error
+}
+
+// vecHeader represents a vector header
+type vecHeader struct {
+ length uint64
+ vectorData [0]uint8
+}
+
+func getVersion(data []byte) uint64 {
+ type apiVersion struct {
+ value uint64
+ }
+ header := (*apiVersion)(unsafe.Pointer(&data[0]))
+ version := &apiVersion{
+ value: atomic.LoadUint64(&header.value),
+ }
+ debugf("stats API version loaded: %d", version.value)
+ return version.value
+}
+
+func vectorLen(v dirVector) dirVector {
+ vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uint64(0))))
+ return dirVector(&vec.length)
+}
+
+func getStatType(dirTypeNum dirType, useLegacyMapping bool) (dirTyp adapter.StatType) {
+ var exists bool
+ if useLegacyMapping {
+ dirTyp, exists = dirTypeMappingLegacy[dirTypeNum]
+ } else {
+ dirTyp, exists = dirTypeMapping[dirTypeNum]
+ }
+ if exists {
+ return dirTyp
+ }
+ return adapter.Unknown
+}
+
+//go:nosplit
+func statSegPointer(v dirVector, offset uintptr) dirVector {
+ return dirVector(uintptr(v) + offset)
+}
diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go
index 9110275..dd84897 100644
--- a/adapter/statsclient/statsclient.go
+++ b/adapter/statsclient/statsclient.go
@@ -18,32 +18,31 @@ package statsclient
import (
"bytes"
"fmt"
+ "net"
"os"
+ "path/filepath"
"regexp"
+ "sync/atomic"
+ "syscall"
+ "time"
+ "go.fd.io/govpp/adapter"
+ "github.com/fsnotify/fsnotify"
+ "github.com/ftrvxmtrx/fd"
logger "github.com/sirupsen/logrus"
-
- "git.fd.io/govpp.git/adapter"
)
const (
// DefaultSocketName is default VPP stats socket file path.
DefaultSocketName = adapter.DefaultStatsSocket
-)
-
-const socketMissing = `
-------------------------------------------------------------
- VPP stats socket file %s is missing!
- - is VPP running with stats segment enabled?
- - is the correct socket name configured?
+ // DefaultSocketRetryPeriod is the time period after the socket availability
+ // will be re-checked
+ DefaultSocketRetryPeriod = 50 * time.Millisecond
- To enable it add following section to your VPP config:
- statseg {
- default
- }
-------------------------------------------------------------
-`
+ // DefaultSocketRetryTimeout is the maximum time for the stats socket
+ DefaultSocketRetryTimeout = 3 * time.Second
+)
var (
// Debug is global variable that determines debug mode
@@ -73,184 +72,481 @@ var _ adapter.StatsAPI = (*StatsClient)(nil)
// StatsClient is the pure Go implementation for VPP stats API.
type StatsClient struct {
- sockAddr string
+ socket string
+ retryPeriod time.Duration
+ retryTimeout time.Duration
+
+ headerData []byte
+
+ // defines the adapter connection state
+ connected uint32
+
+ // to quit socket monitor
+ done chan struct{}
statSegment
}
-// NewStatsClient returns new VPP stats API client.
-func NewStatsClient(sockAddr string) *StatsClient {
- if sockAddr == "" {
- sockAddr = DefaultSocketName
+// Option is a StatsClient option
+type Option func(*StatsClient)
+
+// SetSocketRetryPeriod is and optional parameter to define a custom
+// retry period while waiting for the VPP socket
+func SetSocketRetryPeriod(t time.Duration) Option {
+ return func(c *StatsClient) {
+ c.retryPeriod = t
}
- return &StatsClient{
- sockAddr: sockAddr,
+}
+
+// SetSocketRetryTimeout is and optional parameter to define a custom
+// timeout while waiting for the VPP socket
+func SetSocketRetryTimeout(t time.Duration) Option {
+ return func(c *StatsClient) {
+ c.retryTimeout = t
}
}
-func (c *StatsClient) Connect() error {
- // check if socket exists
- if _, err := os.Stat(c.sockAddr); os.IsNotExist(err) {
- fmt.Fprintf(os.Stderr, socketMissing, c.sockAddr)
- return fmt.Errorf("stats socket file %s does not exist", c.sockAddr)
- } else if err != nil {
- return fmt.Errorf("stats socket error: %v", err)
+// NewStatsClient returns a new StatsClient using socket.
+// If socket is empty string DefaultSocketName is used.
+func NewStatsClient(socket string, options ...Option) *StatsClient {
+ if socket == "" {
+ socket = DefaultSocketName
+ }
+ s := &StatsClient{
+ socket: socket,
}
+ for _, option := range options {
+ option(s)
+ }
+ if s.retryPeriod == 0 {
+ s.retryPeriod = DefaultSocketRetryPeriod
+ }
+ if s.retryTimeout == 0 {
+ s.retryTimeout = DefaultSocketRetryTimeout
+ }
+ return s
+}
- if err := c.statSegment.connect(c.sockAddr); err != nil {
+// Connect to validated VPP stats socket and start monitoring
+// socket file changes
+func (sc *StatsClient) Connect() (err error) {
+ if err := sc.waitForSocket(); err != nil {
return err
}
-
+ sc.done = make(chan struct{})
+ if sc.statSegment, err = sc.connect(); err != nil {
+ return err
+ }
+ sc.monitorSocket()
return nil
}
-func (c *StatsClient) Disconnect() error {
- if err := c.statSegment.disconnect(); err != nil {
- return err
+// Disconnect from the socket, unmap shared memory and terminate
+// socket monitor
+func (sc *StatsClient) Disconnect() error {
+ if sc.headerData == nil {
+ return nil
+ }
+ if err := syscall.Munmap(sc.headerData); err != nil {
+ Log.Debugf("unmapping shared memory failed: %v", err)
+ return fmt.Errorf("unmapping shared memory failed: %v", err)
}
+ sc.headerData = nil
+ sc.done <- struct{}{}
+
+ Log.Debugf("successfully unmapped shared memory")
return nil
}
-func (c *StatsClient) ListStats(patterns ...string) (names []string, err error) {
- sa := c.accessStart()
- if sa.epoch == 0 {
+func (sc *StatsClient) ListStats(patterns ...string) (entries []adapter.StatIdentifier, err error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
+ accessEpoch := sc.accessStart()
+ if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
}
- indexes, err := c.listIndexes(patterns...)
+ entries, err = sc.getIdentifierEntries(patterns...)
if err != nil {
return nil, err
}
- dirVector := c.getStatDirVector()
- vecLen := uint32(vectorLen(dirVector))
-
- for _, index := range indexes {
- if index >= vecLen {
- return nil, fmt.Errorf("stat entry index %d out of dir vector len (%d)", index, vecLen)
- }
-
- dirEntry := c.getStatDirIndex(dirVector, index)
- var name []byte
- for n := 0; n < len(dirEntry.name); n++ {
- if dirEntry.name[n] == 0 {
- name = dirEntry.name[:n]
- break
- }
- }
- names = append(names, string(name))
- }
-
- if !c.accessEnd(&sa) {
+ if !sc.accessEnd(accessEpoch) {
return nil, adapter.ErrStatsDataBusy
}
-
- return names, nil
+ return entries, nil
}
-func (c *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
- sa := c.accessStart()
- if sa.epoch == 0 {
+func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
+
+ accessEpoch := sc.accessStart()
+ if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
}
- indexes, err := c.listIndexes(patterns...)
+ entries, err = sc.getStatEntries(patterns...)
if err != nil {
return nil, err
}
- if entries, err = c.dumpEntries(indexes); err != nil {
- return nil, err
- }
- if !c.accessEnd(&sa) {
+ if !sc.accessEnd(accessEpoch) {
return nil, adapter.ErrStatsDataBusy
}
-
return entries, nil
}
-func (c *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
- dir := new(adapter.StatDir)
+func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
- sa := c.accessStart()
- if sa.epoch == 0 {
+ accessEpoch := sc.accessStart()
+ if accessEpoch == 0 {
return nil, adapter.ErrStatsAccessFailed
}
- indexes, err := c.listIndexes(patterns...)
+ entries, err := sc.getStatEntries(patterns...)
if err != nil {
return nil, err
}
- dir.Indexes = indexes
- entries, err := c.dumpEntries(indexes)
+ if !sc.accessEnd(accessEpoch) {
+ return nil, adapter.ErrStatsDataBusy
+ }
+
+ dir := &adapter.StatDir{
+ Epoch: accessEpoch,
+ Entries: entries,
+ }
+
+ return dir, nil
+}
+
+func (sc *StatsClient) PrepareDirOnIndex(indexes ...uint32) (*adapter.StatDir, error) {
+ if !sc.isConnected() {
+ return nil, adapter.ErrStatsDisconnected
+ }
+
+ accessEpoch := sc.accessStart()
+ if accessEpoch == 0 {
+ return nil, adapter.ErrStatsAccessFailed
+ }
+ vector := sc.GetDirectoryVector()
+ if vector == nil {
+ return nil, fmt.Errorf("failed to prepare dir on index: directory vector is nil")
+ }
+ entries, err := sc.getStatEntriesOnIndex(vector, indexes...)
if err != nil {
return nil, err
}
- dir.Entries = entries
- if !c.accessEnd(&sa) {
+ if !sc.accessEnd(accessEpoch) {
return nil, adapter.ErrStatsDataBusy
}
- dir.Epoch = sa.epoch
+
+ dir := &adapter.StatDir{
+ Epoch: accessEpoch,
+ Entries: entries,
+ }
return dir, nil
}
-func (c *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
- epoch, _ := c.getEpoch()
+// UpdateDir refreshes directory data for all counters
+func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) {
+ if !sc.isConnected() {
+ return adapter.ErrStatsDisconnected
+ }
+ epoch, _ := sc.GetEpoch()
if dir.Epoch != epoch {
return adapter.ErrStatsDirStale
}
- sa := c.accessStart()
- if sa.epoch == 0 {
+ accessEpoch := sc.accessStart()
+ if accessEpoch == 0 {
return adapter.ErrStatsAccessFailed
}
+ dirVector := sc.GetDirectoryVector()
+ if dirVector == nil {
+ return err
+ }
+ for i := 0; i < len(dir.Entries); i++ {
+ if err := sc.updateStatOnIndex(&dir.Entries[i], dirVector); err != nil {
+ return err
+ }
+ }
+ if !sc.accessEnd(accessEpoch) {
+ return adapter.ErrStatsDataBusy
+ }
+ return nil
+}
- dirVector := c.getStatDirVector()
-
- for i, index := range dir.Indexes {
- dirEntry := c.getStatDirIndex(dirVector, index)
-
- var name []byte
- for n := 0; n < len(dirEntry.name); n++ {
- if dirEntry.name[n] == 0 {
- name = dirEntry.name[:n]
- break
+// checks the socket existence and waits for it for the designated
+// time if it is not available immediately
+func (sc *StatsClient) waitForSocket() error {
+ if _, err := os.Stat(sc.socket); err != nil {
+ if os.IsNotExist(err) {
+ n := time.Now()
+ ticker := time.NewTicker(sc.retryPeriod)
+ timeout := time.After(sc.retryTimeout)
+ for {
+ select {
+ case <-ticker.C:
+ if _, err := os.Stat(sc.socket); err == nil {
+ return nil
+ }
+ case <-timeout:
+ return fmt.Errorf("stats socket file %s is not ready within timeout (after %.2f s) ",
+ sc.socket, time.Since(n).Seconds())
+ }
}
+ } else {
+ return fmt.Errorf("stats socket error: %v", err)
}
- if len(name) == 0 {
- continue
- }
+ }
+ return nil
+}
+
+// connect to the socket and map it into the memory. According to the
+// header version info, an appropriate segment handler is returned
+func (sc *StatsClient) connect() (ss statSegment, err error) {
+ addr := net.UnixAddr{
+ Net: "unixpacket",
+ Name: sc.socket,
+ }
+ Log.Debugf("connecting to: %v", addr)
- entry := &dir.Entries[i]
- if !bytes.Equal(name, entry.Name) {
- continue
+ conn, err := net.DialUnix(addr.Net, nil, &addr)
+ if err != nil {
+ Log.Warnf("connecting to socket %s failed: %s", addr, err)
+ return nil, err
+ }
+ defer func() {
+ if err := conn.Close(); err != nil {
+ Log.Warnf("closing socket failed: %v", err)
}
- if adapter.StatType(dirEntry.directoryType) != entry.Type {
- continue
+ }()
+ Log.Debugf("connected to socket")
+
+ files, err := fd.Get(conn, 1, nil)
+ if err != nil {
+ return nil, fmt.Errorf("getting file descriptor over socket failed: %v", err)
+ }
+ if len(files) == 0 {
+ return nil, fmt.Errorf("no files received over socket")
+ }
+
+ file := files[0]
+ defer func() {
+ if err := file.Close(); err != nil {
+ Log.Warnf("closing file failed: %v", err)
}
- if entry.Data == nil {
- continue
+ }()
+
+ info, err := file.Stat()
+ if err != nil {
+ return nil, err
+ }
+ size := info.Size()
+
+ sc.headerData, err = syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
+ if err != nil {
+ Log.Debugf("mapping shared memory failed: %v", err)
+ return nil, fmt.Errorf("mapping shared memory failed: %v", err)
+ }
+ Log.Debugf("successfully mmapped shared memory segment (size: %v) %v", size, len(sc.headerData))
+
+ version := getVersion(sc.headerData)
+ switch version {
+ case 1:
+ ss = newStatSegmentV1(sc.headerData, size)
+ case 2:
+ ss = newStatSegmentV2(sc.headerData, size)
+ default:
+ return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)",
+ version, minVersion, maxVersion)
+ }
+
+ // set connected
+ atomic.CompareAndSwapUint32(&sc.connected, 0, 1)
+
+ return ss, nil
+}
+
+// reconnect disconnects from the socket, re-validates it and
+// connects again
+func (sc *StatsClient) reconnect() (err error) {
+ if err = sc.disconnect(); err != nil {
+ return fmt.Errorf("error disconnecting socket: %v", err)
+ }
+ if err = sc.waitForSocket(); err != nil {
+ return fmt.Errorf("error while waiting on socket: %v", err)
+ }
+ if sc.statSegment, err = sc.connect(); err != nil {
+ return fmt.Errorf("error connecting socket: %v", err)
+ }
+ return nil
+}
+
+// disconnect unmaps socket data from the memory and resets the header
+func (sc *StatsClient) disconnect() error {
+ if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) {
+ return fmt.Errorf("stats client is already disconnected")
+ }
+ if sc.headerData == nil {
+ return nil
+ }
+ if err := syscall.Munmap(sc.headerData); err != nil {
+ Log.Debugf("unmapping shared memory failed: %v", err)
+ return fmt.Errorf("unmapping shared memory failed: %v", err)
+ }
+ sc.headerData = nil
+
+ Log.Debugf("successfully unmapped shared memory")
+ return nil
+}
+
+func (sc *StatsClient) monitorSocket() {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ Log.Errorf("error starting socket monitor: %v", err)
+ return
+ }
+
+ go func() {
+ for {
+ select {
+ case event := <-watcher.Events:
+ if event.Op == fsnotify.Remove && event.Name == sc.socket {
+ if err := sc.reconnect(); err != nil {
+ Log.Errorf("error occurred during socket reconnect: %v", err)
+ }
+ }
+ case err := <-watcher.Errors:
+ Log.Errorf("socket monitor delivered error event: %v", err)
+ case <-sc.done:
+ err := watcher.Close()
+ Log.Debugf("socket monitor closed (error: %v)", err)
+ return
+ }
}
- if err := c.updateEntryData(dirEntry, &entry.Data); err != nil {
- return fmt.Errorf("updating stat data for entry %s failed: %v", name, err)
+ }()
+
+ if err := watcher.Add(filepath.Dir(sc.socket)); err != nil {
+ Log.Errorf("failed to add socket address to the watcher: %v", err)
+ }
+}
+
+// Starts monitoring 'inProgress' field. Returns stats segment
+// access epoch when completed, or zero value if not finished
+// within MaxWaitInProgress
+func (sc *StatsClient) accessStart() (epoch int64) {
+ t := time.Now()
+
+ epoch, inProg := sc.GetEpoch()
+ for inProg {
+ if time.Since(t) > MaxWaitInProgress {
+ return int64(0)
}
+ time.Sleep(CheckDelayInProgress)
+ epoch, inProg = sc.GetEpoch()
+ }
+ return epoch
+}
+// AccessEnd returns true if stats data reading was finished, false
+// otherwise
+func (sc *StatsClient) accessEnd(accessEpoch int64) bool {
+ epoch, inProgress := sc.GetEpoch()
+ if accessEpoch != epoch || inProgress {
+ return false
}
+ return true
+}
- if !c.accessEnd(&sa) {
- return adapter.ErrStatsDataBusy
+// getStatEntries retrieves all stats matching desired patterns, or all stats if no pattern is provided.
+func (sc *StatsClient) getStatEntries(patterns ...string) (entries []adapter.StatEntry, err error) {
+ vector := sc.GetDirectoryVector()
+ if vector == nil {
+ return nil, fmt.Errorf("failed to get stat entries: directory vector is nil")
}
+ indexes, err := sc.listIndexes(vector, patterns...)
+ if err != nil {
+ return nil, err
+ }
+ return sc.getStatEntriesOnIndex(vector, indexes...)
+}
- return nil
+// getIdentifierEntries retrieves all identifiers matching desired patterns, or all identifiers
+// if no pattern is provided.
+func (sc *StatsClient) getIdentifierEntries(patterns ...string) (identifiers []adapter.StatIdentifier, err error) {
+ vector := sc.GetDirectoryVector()
+ if vector == nil {
+ return nil, fmt.Errorf("failed to get identifier entries: directory vector is nil")
+ }
+ indexes, err := sc.listIndexes(vector, patterns...)
+ if err != nil {
+ return nil, err
+ }
+ return sc.getIdentifierEntriesOnIndex(vector, indexes...)
+}
+
+// getStatEntriesOnIndex retrieves stats on indexes, or all stats if indexes are not defined.
+func (sc *StatsClient) getStatEntriesOnIndex(vector dirVector, indexes ...uint32) (entries []adapter.StatEntry, err error) {
+ dirLen := *(*uint32)(vectorLen(vector))
+ for _, index := range indexes {
+ if index >= dirLen {
+ return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
+ }
+ dirPtr, dirName, dirType := sc.GetStatDirOnIndex(vector, index)
+ if len(dirName) == 0 {
+ return
+ }
+ var t adapter.StatType
+ d := sc.CopyEntryData(dirPtr, ^uint32(0))
+ if d != nil {
+ t = d.Type()
+ }
+ entries = append(entries, adapter.StatEntry{
+ StatIdentifier: adapter.StatIdentifier{
+ Index: index,
+ Name: dirName,
+ },
+ Type: t,
+ Data: d,
+ Symlink: dirType == adapter.Symlink,
+ })
+ }
+ return entries, nil
+}
+
+// getIdentifierEntriesOnIndex retrieves identifiers on indexes, or all identifiers if indexes are not defined.
+func (sc *StatsClient) getIdentifierEntriesOnIndex(vector dirVector, indexes ...uint32) (identifiers []adapter.StatIdentifier, err error) {
+ dirLen := *(*uint32)(vectorLen(vector))
+ for _, index := range indexes {
+ if index >= dirLen {
+ return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
+ }
+ _, dirName, _ := sc.GetStatDirOnIndex(vector, index)
+ if len(dirName) == 0 {
+ return
+ }
+ identifiers = append(identifiers, adapter.StatIdentifier{
+ Index: index,
+ Name: dirName,
+ })
+ }
+ return identifiers, nil
}
// listIndexes lists indexes for all stat entries that match any of the regex patterns.
-func (c *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) {
+func (sc *StatsClient) listIndexes(vector dirVector, patterns ...string) (indexes []uint32, err error) {
if len(patterns) == 0 {
- return c.listIndexesFunc(nil)
+ return sc.listIndexesFunc(vector, nil)
}
var regexes = make([]*regexp.Regexp, len(patterns))
for i, pattern := range patterns {
@@ -260,7 +556,7 @@ func (c *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err err
}
regexes[i] = r
}
- nameMatches := func(name []byte) bool {
+ nameMatches := func(name dirName) bool {
for _, r := range regexes {
if r.Match(name) {
return true
@@ -268,31 +564,22 @@ func (c *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err err
}
return false
}
- return c.listIndexesFunc(nameMatches)
+ return sc.listIndexesFunc(vector, nameMatches)
}
-func (c *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) {
+// listIndexesFunc lists stats indexes. The optional function
+// argument filters returned values or returns all if empty
+func (sc *StatsClient) listIndexesFunc(vector dirVector, f func(name dirName) bool) (indexes []uint32, err error) {
if f == nil {
// there is around ~3157 stats, so to avoid too many allocations
// we set capacity to 3200 when listing all stats
indexes = make([]uint32, 0, 3200)
}
-
- dirVector := c.getStatDirVector()
- vecLen := uint32(vectorLen(dirVector))
-
+ vecLen := *(*uint32)(vectorLen(vector))
for i := uint32(0); i < vecLen; i++ {
- dirEntry := c.getStatDirIndex(dirVector, i)
-
+ _, dirName, _ := sc.GetStatDirOnIndex(vector, i)
if f != nil {
- var name []byte
- for n := 0; n < len(dirEntry.name); n++ {
- if dirEntry.name[n] == 0 {
- name = dirEntry.name[:n]
- break
- }
- }
- if len(name) == 0 || !f(name) {
+ if len(dirName) == 0 || !f(dirName) {
continue
}
}
@@ -302,44 +589,25 @@ func (c *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint3
return indexes, nil
}
-func (c *StatsClient) dumpEntries(indexes []uint32) (entries []adapter.StatEntry, err error) {
- dirVector := c.getStatDirVector()
- dirLen := uint32(vectorLen(dirVector))
-
- debugf("dumping entres for %d indexes", len(indexes))
-
- entries = make([]adapter.StatEntry, 0, len(indexes))
- for _, index := range indexes {
- if index >= dirLen {
- return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen)
- }
-
- dirEntry := c.getStatDirIndex(dirVector, index)
-
- var name []byte
- for n := 0; n < len(dirEntry.name); n++ {
- if dirEntry.name[n] == 0 {
- name = dirEntry.name[:n]
- break
- }
- }
-
- if Debug {
- debugf(" - %3d. dir: %q type: %v offset: %d union: %d", index, name,
- adapter.StatType(dirEntry.directoryType), dirEntry.offsetVector, dirEntry.unionData)
- }
-
- if len(name) == 0 {
- continue
- }
+func (sc *StatsClient) isConnected() bool {
+ return atomic.LoadUint32(&sc.connected) == 1
+}
- entry := adapter.StatEntry{
- Name: append([]byte(nil), name...),
- Type: adapter.StatType(dirEntry.directoryType),
- Data: c.copyEntryData(dirEntry),
- }
- entries = append(entries, entry)
+// updateStatOnIndex refreshes the entry data.
+func (sc *StatsClient) updateStatOnIndex(entry *adapter.StatEntry, vector dirVector) (err error) {
+ dirLen := *(*uint32)(vectorLen(vector))
+ if entry.Index >= dirLen {
+ return fmt.Errorf("stat entry index %d out of dir vector length (%d)", entry.Index, dirLen)
}
-
- return entries, nil
+ dirPtr, dirName, dirType := sc.GetStatDirOnIndex(vector, entry.Index)
+ if len(dirName) == 0 ||
+ !bytes.Equal(dirName, entry.Name) ||
+ dirType != entry.Type ||
+ entry.Data == nil {
+ return nil
+ }
+ if err := sc.UpdateEntryData(dirPtr, &entry.Data); err != nil {
+ err = fmt.Errorf("updating stat data for entry %s failed: %v", dirName, err)
+ }
+ return
}
diff --git a/adapter/statsclient/statseg.go b/adapter/statsclient/statseg.go
deleted file mode 100644
index 42ab3de..0000000
--- a/adapter/statsclient/statseg.go
+++ /dev/null
@@ -1,104 +0,0 @@
-package statsclient
-
-import (
- "sync/atomic"
- "time"
- "unsafe"
-)
-
-var (
- MaxWaitInProgress = time.Millisecond * 100
- CheckDelayInProgress = time.Microsecond * 10
-)
-
-type sharedHeaderBase struct {
- epoch int64
- inProgress int64
- directoryOffset int64
- errorOffset int64
- statsOffset int64
-}
-
-type sharedHeaderV0 struct {
- sharedHeaderBase
-}
-
-type sharedHeader struct {
- version uint64
- sharedHeaderBase
-}
-
-func (h *sharedHeader) legacyVersion() bool {
- // older VPP (<=19.04) did not have version in stat segment header
- // we try to provide fallback support by skipping it in header
- if h.version > maxVersion && h.inProgress > 1 && h.epoch == 0 {
- return true
- }
- return false
-}
-
-func loadSharedHeader(b []byte) (header sharedHeader) {
- h := (*sharedHeader)(unsafe.Pointer(&b[0]))
- header.version = atomic.LoadUint64(&h.version)
- header.epoch = atomic.LoadInt64(&h.epoch)
- header.inProgress = atomic.LoadInt64(&h.inProgress)
- header.directoryOffset = atomic.LoadInt64(&h.directoryOffset)
- header.errorOffset = atomic.LoadInt64(&h.errorOffset)
- header.statsOffset = atomic.LoadInt64(&h.statsOffset)
- return
-}
-
-func loadSharedHeaderLegacy(b []byte) (header sharedHeader) {
- h := (*sharedHeaderV0)(unsafe.Pointer(&b[0]))
- header.version = 0
- header.epoch = atomic.LoadInt64(&h.epoch)
- header.inProgress = atomic.LoadInt64(&h.inProgress)
- header.directoryOffset = atomic.LoadInt64(&h.directoryOffset)
- header.errorOffset = atomic.LoadInt64(&h.errorOffset)
- header.statsOffset = atomic.LoadInt64(&h.statsOffset)
- return
-}
-
-type statSegAccess struct {
- epoch int64
-}
-
-func (c *statSegment) accessStart() statSegAccess {
- t := time.Now()
-
- epoch, inprog := c.getEpoch()
- for inprog {
- if time.Since(t) > MaxWaitInProgress {
- return statSegAccess{}
- } else {
- time.Sleep(CheckDelayInProgress)
- }
- epoch, inprog = c.getEpoch()
- }
- return statSegAccess{
- epoch: epoch,
- }
-}
-
-func (c *statSegment) accessEnd(acc *statSegAccess) bool {
- epoch, inprog := c.getEpoch()
- if acc.epoch != epoch || inprog {
- return false
- }
- return true
-}
-
-type vecHeader struct {
- length uint64
- vectorData [0]uint8
-}
-
-func vectorLen(v unsafe.Pointer) uint64 {
- vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uint64(0))))
- return vec.length
-}
-
-//go:nosplit
-func statSegPointer(p unsafe.Pointer, offset uintptr) unsafe.Pointer {
- return unsafe.Pointer(uintptr(p) + offset)
-}
diff --git a/adapter/statsclient/statseg_v1.go b/adapter/statsclient/statseg_v1.go
new file mode 100644
index 0000000..9e0e469
--- /dev/null
+++ b/adapter/statsclient/statseg_v1.go
@@ -0,0 +1,365 @@
+// Copyright (c) 2019 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statsclient
+
+import (
+ "fmt"
+ "sync/atomic"
+ "unsafe"
+
+ "go.fd.io/govpp/adapter"
+)
+
+type statSegmentV1 struct {
+ sharedHeader []byte
+ memorySize int64
+}
+
+type sharedHeaderV1 struct {
+ version uint64
+ epoch int64
+ inProgress int64
+ directoryOffset int64
+ errorOffset int64
+ statsOffset int64
+}
+
+type statSegDirectoryEntryV1 struct {
+ directoryType dirType
+ // unionData can represent:
+ // - offset
+ // - index
+ // - value
+ unionData uint64
+ offsetVector uint64
+ name [128]byte
+}
+
+func newStatSegmentV1(data []byte, size int64) *statSegmentV1 {
+ return &statSegmentV1{
+ sharedHeader: data,
+ memorySize: size,
+ }
+}
+
+func (ss *statSegmentV1) loadSharedHeader(b []byte) (header sharedHeaderV1) {
+ h := (*sharedHeaderV1)(unsafe.Pointer(&b[0]))
+ return sharedHeaderV1{
+ version: atomic.LoadUint64(&h.version),
+ epoch: atomic.LoadInt64(&h.epoch),
+ inProgress: atomic.LoadInt64(&h.inProgress),
+ directoryOffset: atomic.LoadInt64(&h.directoryOffset),
+ errorOffset: atomic.LoadInt64(&h.errorOffset),
+ statsOffset: atomic.LoadInt64(&h.statsOffset),
+ }
+}
+
+func (ss *statSegmentV1) GetDirectoryVector() dirVector {
+ dirOffset, _, _ := ss.getOffsets()
+ return dirVector(&ss.sharedHeader[dirOffset])
+}
+
+func (ss *statSegmentV1) getErrorVector() (unsafe.Pointer, error) {
+ return nil, fmt.Errorf("error vector is not defined for stats API v1")
+}
+
+func (ss *statSegmentV1) GetStatDirOnIndex(v dirVector, index uint32) (dirSegment, dirName, adapter.StatType) {
+ statSegDir := dirSegment(uintptr(v) + uintptr(index)*unsafe.Sizeof(statSegDirectoryEntryV1{}))
+ dir := (*statSegDirectoryEntryV1)(statSegDir)
+ var name []byte
+ for n := 0; n < len(dir.name); n++ {
+ if dir.name[n] == 0 {
+ name = dir.name[:n]
+ break
+ }
+ }
+ return statSegDir, name, getStatType(dir.directoryType, true)
+}
+
+func (ss *statSegmentV1) GetEpoch() (int64, bool) {
+ sh := ss.loadSharedHeader(ss.sharedHeader)
+ return sh.epoch, sh.inProgress != 0
+}
+
+func (ss *statSegmentV1) CopyEntryData(segment dirSegment, _ uint32) adapter.Stat {
+ dirEntry := (*statSegDirectoryEntryV1)(segment)
+ typ := getStatType(dirEntry.directoryType, true)
+
+ switch typ {
+ case adapter.ScalarIndex:
+ return adapter.ScalarStat(dirEntry.unionData)
+
+ case adapter.ErrorIndex:
+ if dirEntry.unionData >= uint64(len(ss.sharedHeader)) {
+ debugf("offset out of range for %s", dirEntry.name)
+ break
+ }
+
+ _, errOffset, _ := ss.getOffsets()
+ offsetVector := dirVector(&ss.sharedHeader[errOffset])
+
+ var errData []adapter.Counter
+
+ vecLen := *(*uint32)(vectorLen(offsetVector))
+ for i := uint32(0); i < vecLen; i++ {
+ cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+ offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0))
+ debugf("error index, cb: %d, offset: %d", cb, offset)
+ val := *(*adapter.Counter)(statSegPointer(dirVector(&ss.sharedHeader[0]), offset))
+ errData = append(errData, val)
+ }
+ return adapter.ErrorStat(errData)
+
+ case adapter.SimpleCounterVector:
+ if dirEntry.unionData == 0 {
+ debugf("offset invalid for %s", dirEntry.name)
+ break
+ } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) {
+ debugf("offset out of range for %s", dirEntry.name)
+ break
+ }
+
+ vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData])))
+ offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+ data := make([][]adapter.Counter, vecLen)
+ for i := uint32(0); i < vecLen; i++ {
+ cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+ counterVec := dirVector(&ss.sharedHeader[uintptr(cb)])
+ vecLen2 := *(*uint32)(vectorLen(counterVec))
+ data[i] = make([]adapter.Counter, vecLen2)
+ for j := uint32(0); j < vecLen2; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0))
+ val := *(*adapter.Counter)(statSegPointer(counterVec, offset))
+ data[i][j] = val
+ }
+ }
+ return adapter.SimpleCounterStat(data)
+
+ case adapter.CombinedCounterVector:
+ if dirEntry.unionData == 0 {
+ debugf("offset invalid for %s", dirEntry.name)
+ break
+ } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) {
+ debugf("offset out of range for %s", dirEntry.name)
+ break
+ }
+
+ vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData])))
+ offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+ data := make([][]adapter.CombinedCounter, vecLen)
+ for i := uint32(0); i < vecLen; i++ {
+ cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+ counterVec := dirVector(&ss.sharedHeader[uintptr(cb)])
+ vecLen2 := *(*uint32)(vectorLen(counterVec))
+ data[i] = make([]adapter.CombinedCounter, vecLen2)
+ for j := uint32(0); j < vecLen2; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{})
+ val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset))
+ data[i][j] = val
+ }
+ }
+ return adapter.CombinedCounterStat(data)
+
+ case adapter.NameVector:
+ if dirEntry.unionData == 0 {
+ debugf("offset invalid for %s", dirEntry.name)
+ break
+ } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) {
+ debugf("offset out of range for %s", dirEntry.name)
+ break
+ }
+
+ vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData])))
+ offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+ data := make([]adapter.Name, vecLen)
+ for i := uint32(0); i < vecLen; i++ {
+ cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+ if cb == 0 {
+ debugf("name vector out of range for %s (%v)", dirEntry.name, i)
+ continue
+ }
+ nameVec := dirVector(&ss.sharedHeader[cb])
+ vecLen2 := *(*uint32)(vectorLen(nameVec))
+
+ nameStr := make([]byte, 0, vecLen2)
+ for j := uint32(0); j < vecLen2; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(byte(0))
+ val := *(*byte)(statSegPointer(nameVec, offset))
+ if val > 0 {
+ nameStr = append(nameStr, val)
+ }
+ }
+ data[i] = nameStr
+ }
+ return adapter.NameStat(data)
+
+ case adapter.Empty:
+ // no-op
+
+ case adapter.Symlink:
+ debugf("Symlinks are not supported for stats v1")
+
+ default:
+ // TODO: monitor occurrences with metrics
+ debugf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name)
+ }
+ return nil
+}
+
+func (ss *statSegmentV1) UpdateEntryData(segment dirSegment, stat *adapter.Stat) error {
+ dirEntry := (*statSegDirectoryEntryV1)(segment)
+ switch (*stat).(type) {
+ case adapter.ScalarStat:
+ *stat = adapter.ScalarStat(dirEntry.unionData)
+
+ case adapter.ErrorStat:
+ if dirEntry.unionData == 0 {
+ debugf("offset invalid for %s", dirEntry.name)
+ break
+ } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) {
+ debugf("offset out of range for %s", dirEntry.name)
+ break
+ }
+
+ _, errOffset, _ := ss.getOffsets()
+ offsetVector := dirVector(&ss.sharedHeader[errOffset])
+
+ var errData []adapter.Counter
+
+ vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[errOffset])))
+ for i := uint32(0); i < vecLen; i++ {
+ cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+ offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0))
+ val := *(*adapter.Counter)(statSegPointer(dirVector(&ss.sharedHeader[0]), offset))
+ errData = append(errData, val)
+ }
+ *stat = adapter.ErrorStat(errData)
+
+ case adapter.SimpleCounterStat:
+ if dirEntry.unionData == 0 {
+ debugf("offset invalid for %s", dirEntry.name)
+ break
+ } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) {
+ debugf("offset out of range for %s", dirEntry.name)
+ break
+ }
+
+ vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData])))
+ offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+ data := (*stat).(adapter.SimpleCounterStat)
+ if uint32(len(data)) != vecLen {
+ return ErrStatDataLenIncorrect
+ }
+ for i := uint32(0); i < vecLen; i++ {
+ cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+ counterVec := dirVector(&ss.sharedHeader[uintptr(cb)])
+ vecLen2 := *(*uint32)(vectorLen(counterVec))
+ simpleData := data[i]
+ if uint32(len(simpleData)) != vecLen2 {
+ return ErrStatDataLenIncorrect
+ }
+ for j := uint32(0); j < vecLen2; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0))
+ val := *(*adapter.Counter)(statSegPointer(counterVec, offset))
+ simpleData[j] = val
+ }
+ }
+
+ case adapter.CombinedCounterStat:
+ if dirEntry.unionData == 0 {
+ debugf("offset invalid for %s", dirEntry.name)
+ break
+ } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) {
+ debugf("offset out of range for %s", dirEntry.name)
+ break
+ }
+
+ vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData])))
+ offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+ data := (*stat).(adapter.CombinedCounterStat)
+ if uint32(len(data)) != vecLen {
+ return ErrStatDataLenIncorrect
+ }
+ for i := uint32(0); i < vecLen; i++ {
+ cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+ counterVec := dirVector(&ss.sharedHeader[uintptr(cb)])
+ vecLen2 := *(*uint32)(vectorLen(counterVec))
+ combData := data[i]
+ if uint32(len(combData)) != vecLen2 {
+ return ErrStatDataLenIncorrect
+ }
+ for j := uint32(0); j < vecLen2; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{})
+ val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset))
+ combData[j] = val
+ }
+ }
+
+ case adapter.NameStat:
+ if dirEntry.unionData == 0 {
+ debugf("offset invalid for %s", dirEntry.name)
+ break
+ } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) {
+ debugf("offset out of range for %s", dirEntry.name)
+ break
+ }
+
+ vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData])))
+ offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector))
+
+ data := (*stat).(adapter.NameStat)
+ if uint32(len(data)) != vecLen {
+ return ErrStatDataLenIncorrect
+ }
+ for i := uint32(0); i < vecLen; i++ {
+ cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0))))
+ if cb == 0 {
+ continue
+ }
+ nameVec := dirVector(&ss.sharedHeader[cb])
+ vecLen2 := *(*uint32)(vectorLen(nameVec))
+
+ nameData := data[i]
+ if uint32(len(nameData))+1 != vecLen2 {
+ return ErrStatDataLenIncorrect
+ }
+ for j := uint32(0); j < vecLen2; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(byte(0))
+ val := *(*byte)(statSegPointer(nameVec, offset))
+ if val == 0 {
+ break
+ }
+ nameData[j] = val
+ }
+ }
+
+ default:
+ if Debug {
+ Log.Debugf("Unrecognized stat type %T for stat entry: %v", stat, dirEntry.name)
+ }
+ }
+ return nil
+}
+
+// Get offsets for various types of data
+func (ss *statSegmentV1) getOffsets() (dir, err, stat int64) {
+ sh := ss.loadSharedHeader(ss.sharedHeader)
+ return sh.directoryOffset, sh.errorOffset, sh.statsOffset
+}
diff --git a/adapter/statsclient/statseg_v2.go b/adapter/statsclient/statseg_v2.go
new file mode 100644
index 0000000..f808112
--- /dev/null
+++ b/adapter/statsclient/statseg_v2.go
@@ -0,0 +1,411 @@
+// Copyright (c) 2020 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statsclient
+
+import (
+ "bytes"
+ "encoding/binary"
+ "sync/atomic"
+ "unsafe"
+
+ "go.fd.io/govpp/adapter"
+)
+
+type statSegmentV2 struct {
+ sharedHeader []byte
+ memorySize int64
+}
+
+type sharedHeaderV2 struct {
+ version uint64
+ base unsafe.Pointer
+ epoch int64
+ inProgress int64
+ dirVector unsafe.Pointer
+ errorVector unsafe.Pointer
+}
+
+type statSegDirectoryEntryV2 struct {
+ directoryType dirType
+ // unionData can represent:
+ // - symlink indexes
+ // - index
+ // - value
+ // - pointer to data
+ unionData uint64
+ name [128]byte
+}
+
+func newStatSegmentV2(data []byte, size int64) *statSegmentV2 {
+ return &statSegmentV2{
+ sharedHeader: data,
+ memorySize: size,
+ }
+}
+
+func (ss *statSegmentV2) loadSharedHeader(b []byte) (header sharedHeaderV2) {
+ h := (*sharedHeaderV2)(unsafe.Pointer(&b[0]))
+ return sharedHeaderV2{
+ version: atomic.LoadUint64(&h.version),
+ base: atomic.LoadPointer(&h.base),
+ epoch: atomic.LoadInt64(&h.epoch),
+ inProgress: atomic.LoadInt64(&h.inProgress),
+ dirVector: atomic.LoadPointer(&h.dirVector),
+ errorVector: atomic.LoadPointer(&h.errorVector),
+ }
+}
+
+func (ss *statSegmentV2) GetDirectoryVector() dirVector {
+ header := ss.loadSharedHeader(ss.sharedHeader)
+ return ss.adjust(dirVector(&header.dirVector))
+}
+
+func (ss *statSegmentV2) GetStatDirOnIndex(v dirVector, index uint32) (dirSegment, dirName, adapter.StatType) {
+ statSegDir := dirSegment(uintptr(v) + uintptr(index)*unsafe.Sizeof(statSegDirectoryEntryV2{}))
+ dir := (*statSegDirectoryEntryV2)(statSegDir)
+ var name []byte
+ for n := 0; n < len(dir.name); n++ {
+ if dir.name[n] == 0 {
+ name = dir.name[:n]
+ break
+ }
+ }
+ return statSegDir, name, getStatType(dir.directoryType, ss.getErrorVector() != nil)
+}
+
+func (ss *statSegmentV2) GetEpoch() (int64, bool) {
+ sh := ss.loadSharedHeader(ss.sharedHeader)
+ return sh.epoch, sh.inProgress != 0
+}
+
+func (ss *statSegmentV2) CopyEntryData(segment dirSegment, index uint32) adapter.Stat {
+ dirEntry := (*statSegDirectoryEntryV2)(segment)
+ typ := getStatType(dirEntry.directoryType, ss.getErrorVector() != nil)
+ // skip zero pointer value
+ if typ != adapter.ScalarIndex && typ != adapter.Empty && typ != adapter.ErrorIndex && dirEntry.unionData == 0 {
+ debugf("data pointer not defined for %s", dirEntry.name)
+ return nil
+ }
+
+ switch typ {
+ case adapter.ScalarIndex:
+ return adapter.ScalarStat(dirEntry.unionData)
+
+ case adapter.ErrorIndex:
+ dirVector := ss.getErrorVector()
+ if dirVector == nil {
+ debugf("error vector pointer is out of range for %s", dirEntry.name)
+ return nil
+ }
+ vecLen := *(*uint32)(vectorLen(dirVector))
+ var errData []adapter.Counter
+ for i := uint32(0); i < vecLen; i++ {
+ cb := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0)))
+ cbVal := ss.adjust(vectorLen(cb))
+ if cbVal == nil {
+ debugf("error counter pointer out of range")
+ continue
+ }
+ offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(adapter.Counter(0))
+ val := *(*adapter.Counter)(statSegPointer(cbVal, offset))
+ errData = append(errData, val)
+ }
+ return adapter.ErrorStat(errData)
+
+ case adapter.SimpleCounterVector:
+ dirVector := ss.adjust(dirVector(&dirEntry.unionData))
+ if dirVector == nil {
+ debugf("data vector pointer is out of range for %s", dirEntry.name)
+ return nil
+ }
+ vecLen := *(*uint32)(vectorLen(dirVector))
+ data := make([][]adapter.Counter, vecLen)
+ for i := uint32(0); i < vecLen; i++ {
+ counterVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0)))
+ counterVector := ss.adjust(vectorLen(counterVectorOffset))
+ if counterVector == nil {
+ debugf("counter (vector simple) pointer out of range")
+ continue
+ }
+ counterVectorLength := *(*uint32)(vectorLen(counterVector))
+ if index == ^uint32(0) {
+ data[i] = make([]adapter.Counter, counterVectorLength)
+ for j := uint32(0); j < counterVectorLength; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0))
+ data[i][j] = *(*adapter.Counter)(statSegPointer(counterVector, offset))
+ }
+ } else {
+ data[i] = make([]adapter.Counter, 1) // expect single value
+ for j := uint32(0); j < counterVectorLength; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0))
+ if index == j {
+ data[i][0] = *(*adapter.Counter)(statSegPointer(counterVector, offset))
+ break
+ }
+ }
+ }
+ }
+ return adapter.SimpleCounterStat(data)
+
+ case adapter.CombinedCounterVector:
+ dirVector := ss.adjust(dirVector(&dirEntry.unionData))
+ if dirVector == nil {
+ debugf("data vector pointer is out of range for %s", dirEntry.name)
+ return nil
+ }
+ vecLen := *(*uint32)(vectorLen(dirVector))
+ data := make([][]adapter.CombinedCounter, vecLen)
+ for i := uint32(0); i < vecLen; i++ {
+ counterVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0)))
+ counterVector := ss.adjust(vectorLen(counterVectorOffset))
+ if counterVector == nil {
+ debugf("counter (vector combined) pointer out of range")
+ continue
+ }
+ counterVectorLength := *(*uint32)(vectorLen(counterVector))
+ if index == ^uint32(0) {
+ data[i] = make([]adapter.CombinedCounter, counterVectorLength)
+ for j := uint32(0); j < counterVectorLength; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{})
+ data[i][j] = *(*adapter.CombinedCounter)(statSegPointer(counterVector, offset))
+ }
+ } else {
+ data[i] = make([]adapter.CombinedCounter, 1) // expect single value pair
+ for j := uint32(0); j < counterVectorLength; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{})
+ if index == j {
+ data[i][0] = *(*adapter.CombinedCounter)(statSegPointer(counterVector, offset))
+ break
+ }
+ }
+ }
+ }
+ return adapter.CombinedCounterStat(data)
+
+ case adapter.NameVector:
+ dirVector := ss.adjust(dirVector(&dirEntry.unionData))
+ if dirVector == nil {
+ debugf("data vector pointer is out of range for %s", dirEntry.name)
+ return nil
+ }
+ vecLen := *(*uint32)(vectorLen(dirVector))
+ data := make([]adapter.Name, vecLen)
+ for i := uint32(0); i < vecLen; i++ {
+ nameVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0)))
+ if uintptr(nameVectorOffset) == 0 {
+ debugf("name vector out of range for %s (%v)", dirEntry.name, i)
+ continue
+ }
+ nameVector := ss.adjust(vectorLen(nameVectorOffset))
+ if nameVector == nil {
+ debugf("name data pointer out of range")
+ continue
+ }
+ nameVectorLen := *(*uint32)(vectorLen(nameVector))
+ name := make([]byte, 0, nameVectorLen)
+ for j := uint32(0); j < nameVectorLen; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(byte(0))
+ value := *(*byte)(statSegPointer(nameVector, offset))
+ if value > 0 {
+ name = append(name, value)
+ }
+ }
+ data[i] = name
+ }
+ return adapter.NameStat(data)
+
+ case adapter.Empty:
+ return adapter.EmptyStat("<none>")
+ // no-op
+
+ case adapter.Symlink:
+ // prevent recursion loops
+ if index != ^uint32(0) {
+ debugf("received symlink with defined item index")
+ return nil
+ }
+ i1, i2 := ss.getSymlinkIndexes(dirEntry)
+ // use first index to get the stats directory the symlink points to
+ header := ss.loadSharedHeader(ss.sharedHeader)
+ dirVector := ss.adjust(dirVector(&header.dirVector))
+ statSegDir2 := dirSegment(uintptr(dirVector) + uintptr(i1)*unsafe.Sizeof(statSegDirectoryEntryV2{}))
+
+ // retry with actual stats segment and use second index to get
+ // stats for the required item
+ return ss.CopyEntryData(statSegDir2, i2)
+
+ default:
+ // TODO: monitor occurrences with metrics
+ debugf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name)
+ }
+ return nil
+}
+
+func (ss *statSegmentV2) UpdateEntryData(segment dirSegment, stat *adapter.Stat) error {
+ dirEntry := (*statSegDirectoryEntryV2)(segment)
+ switch (*stat).(type) {
+ case adapter.ScalarStat:
+ *stat = adapter.ScalarStat(dirEntry.unionData)
+
+ case adapter.ErrorStat:
+ dirVector := ss.getErrorVector()
+ if dirVector == nil {
+ debugf("error vector pointer is out of range for %s", dirEntry.name)
+ return nil
+ }
+ vecLen := *(*uint32)(vectorLen(dirVector))
+ var errData []adapter.Counter
+ for i := uint32(0); i < vecLen; i++ {
+ cb := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0)))
+ cbVal := ss.adjust(vectorLen(cb))
+ if cbVal == nil {
+ debugf("error counter pointer out of range")
+ continue
+ }
+ offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(adapter.Counter(0))
+ val := *(*adapter.Counter)(statSegPointer(cbVal, offset))
+ errData = append(errData, val)
+ }
+ *stat = adapter.ErrorStat(errData)
+
+ case adapter.SimpleCounterStat:
+ dirVector := ss.adjust(dirVector(&dirEntry.unionData))
+ if dirVector == nil {
+ debugf("data vector pointer is out of range for %s", dirEntry.name)
+ return nil
+ }
+ vecLen := *(*uint32)(vectorLen(dirVector))
+ data := (*stat).(adapter.SimpleCounterStat)
+ if uint32(len(data)) != vecLen {
+ return ErrStatDataLenIncorrect
+ }
+ for i := uint32(0); i < vecLen; i++ {
+ counterVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0)))
+ counterVector := ss.adjust(vectorLen(counterVectorOffset))
+ if counterVector == nil {
+ debugf("counter (vector simple) pointer out of range")
+ continue
+ }
+ counterVectorLength := *(*uint32)(vectorLen(counterVector))
+ data[i] = make([]adapter.Counter, counterVectorLength)
+ for j := uint32(0); j < counterVectorLength; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0))
+ val := *(*adapter.Counter)(statSegPointer(counterVector, offset))
+ data[i][j] = val
+ }
+ }
+
+ case adapter.CombinedCounterStat:
+ dirVector := ss.adjust(dirVector(&dirEntry.unionData))
+ if dirVector == nil {
+ debugf("data vector pointer is out of range for %s", dirEntry.name)
+ return nil
+ }
+ vecLen := *(*uint32)(vectorLen(dirVector))
+ data := (*stat).(adapter.CombinedCounterStat)
+ for i := uint32(0); i < vecLen; i++ {
+ counterVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0)))
+ counterVector := ss.adjust(vectorLen(counterVectorOffset))
+ if counterVector == nil {
+ debugf("counter (vector combined) pointer out of range")
+ continue
+ }
+ counterVectorLength := *(*uint32)(vectorLen(counterVector))
+ data[i] = make([]adapter.CombinedCounter, counterVectorLength)
+ for j := uint32(0); j < counterVectorLength; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{})
+ val := *(*adapter.CombinedCounter)(statSegPointer(counterVector, offset))
+ data[i][j] = val
+ }
+ }
+
+ case adapter.NameStat:
+ dirVector := ss.adjust(dirVector(&dirEntry.unionData))
+ if dirVector == nil {
+ debugf("data vector pointer is out of range for %s", dirEntry.name)
+ return nil
+ }
+ vecLen := *(*uint32)(vectorLen(dirVector))
+ data := (*stat).(adapter.NameStat)
+ for i := uint32(0); i < vecLen; i++ {
+ nameVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0)))
+ if uintptr(nameVectorOffset) == 0 {
+ debugf("name vector out of range for %s (%v)", dirEntry.name, i)
+ continue
+ }
+ nameVector := ss.adjust(vectorLen(nameVectorOffset))
+ if nameVector == nil {
+ debugf("name data pointer out of range")
+ continue
+ }
+ nameVectorLen := *(*uint32)(vectorLen(nameVector))
+ nameData := data[i]
+ if uint32(len(nameData))+1 != nameVectorLen {
+ return ErrStatDataLenIncorrect
+ }
+ for j := uint32(0); j < nameVectorLen; j++ {
+ offset := uintptr(j) * unsafe.Sizeof(byte(0))
+ value := *(*byte)(statSegPointer(nameVector, offset))
+ if value == 0 {
+ break
+ }
+ nameData[j] = value
+ }
+ }
+
+ default:
+ if Debug {
+ Log.Debugf("Unrecognized stat type %T for stat entry: %v", stat, dirEntry.name)
+ }
+ }
+ return nil
+}
+
+// Adjust data pointer using shared header and base and return
+// the pointer to a data segment
+func (ss *statSegmentV2) adjust(data dirVector) dirVector {
+ header := ss.loadSharedHeader(ss.sharedHeader)
+ adjusted := dirVector(uintptr(unsafe.Pointer(&ss.sharedHeader[0])) +
+ uintptr(*(*uint64)(data)) - uintptr(*(*uint64)(unsafe.Pointer(&header.base))))
+ if uintptr(unsafe.Pointer(&ss.sharedHeader[len(ss.sharedHeader)-1])) <= uintptr(adjusted) ||
+ uintptr(unsafe.Pointer(&ss.sharedHeader[0])) >= uintptr(adjusted) {
+ return nil
+ }
+ return adjusted
+}
+
+func (ss *statSegmentV2) getErrorVector() dirVector {
+ header := ss.loadSharedHeader(ss.sharedHeader)
+ return ss.adjust(dirVector(&header.errorVector))
+}
+
+func (ss *statSegmentV2) getSymlinkIndexes(dirEntry *statSegDirectoryEntryV2) (index1, index2 uint32) {
+ var b bytes.Buffer
+ if err := binary.Write(&b, binary.LittleEndian, dirEntry.unionData); err != nil {
+ debugf("error getting symlink indexes for %s: %v", dirEntry.name, err)
+ return
+ }
+ if len(b.Bytes()) != 8 {
+ debugf("incorrect symlink union data length for %s: expected 8, got %d", dirEntry.name, len(b.Bytes()))
+ return
+ }
+ for i := range b.Bytes()[:4] {
+ index1 += uint32(b.Bytes()[i]) << (uint32(i) * 8)
+ }
+ for i := range b.Bytes()[4:] {
+ index2 += uint32(b.Bytes()[i+4]) << (uint32(i) * 8)
+ }
+ return
+}
diff --git a/adapter/vppapiclient/stat_client.go b/adapter/vppapiclient/stat_client.go
index bf19c45..4144c6c 100644
--- a/adapter/vppapiclient/stat_client.go
+++ b/adapter/vppapiclient/stat_client.go
@@ -29,7 +29,7 @@ import (
"os"
"unsafe"
- "git.fd.io/govpp.git/adapter"
+ "go.fd.io/govpp/adapter"
)
// global VPP stats API client, library vppapiclient only supports
@@ -83,7 +83,7 @@ func (c *statClient) Disconnect() error {
return nil
}
-func (c *statClient) ListStats(patterns ...string) (stats []string, err error) {
+func (c *statClient) ListStats(patterns ...string) (indexes []adapter.StatIdentifier, err error) {
dir := C.govpp_stat_segment_ls(convertStringSlice(patterns))
if dir == nil {
return nil, adapter.ErrStatsDataBusy
@@ -93,11 +93,14 @@ func (c *statClient) ListStats(patterns ...string) (stats []string, err error) {
l := C.govpp_stat_segment_vec_len(unsafe.Pointer(dir))
for i := 0; i < int(l); i++ {
nameChar := C.govpp_stat_segment_dir_index_to_name(dir, C.uint32_t(i))
- stats = append(stats, C.GoString(nameChar))
+ indexes = append(indexes, adapter.StatIdentifier{
+ Name: []byte(C.GoString(nameChar)),
+ Index: uint32(i),
+ })
C.free(unsafe.Pointer(nameChar))
}
- return stats, nil
+ return indexes, nil
}
func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, err error) {
@@ -121,7 +124,10 @@ func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, e
typ := adapter.StatType(C.govpp_stat_segment_data_type(&v))
stat := adapter.StatEntry{
- Name: []byte(name),
+ StatIdentifier: adapter.StatIdentifier{
+ Name: []byte(name),
+ Index: uint32(i),
+ },
Type: typ,
}
@@ -130,7 +136,7 @@ func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, e
stat.Data = adapter.ScalarStat(C.govpp_stat_segment_data_get_scalar_value(&v))
case adapter.ErrorIndex:
- stat.Data = adapter.ErrorStat(C.govpp_stat_segment_data_get_error_value(&v))
+ stat.Data = adapter.ErrorStat([]adapter.Counter{adapter.Counter(C.govpp_stat_segment_data_get_error_value(&v))})
case adapter.SimpleCounterVector:
length := int(C.govpp_stat_segment_vec_len(unsafe.Pointer(C.govpp_stat_segment_data_get_simple_counter(&v))))
@@ -147,10 +153,10 @@ func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, e
vector := make([][]adapter.CombinedCounter, length)
for k := 0; k < length; k++ {
for j := 0; j < int(C.govpp_stat_segment_vec_len(unsafe.Pointer(C.govpp_stat_segment_data_get_combined_counter_index(&v, C.int(k))))); j++ {
- vector[k] = append(vector[k], adapter.CombinedCounter([2]uint64{
+ vector[k] = append(vector[k], [2]uint64{
uint64(C.govpp_stat_segment_data_get_combined_counter_index_packets(&v, C.int(k), C.int(j))),
uint64(C.govpp_stat_segment_data_get_combined_counter_index_bytes(&v, C.int(k), C.int(j))),
- }))
+ })
}
}
stat.Data = adapter.CombinedCounterStat(vector)
@@ -180,11 +186,15 @@ func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, e
return stats, nil
}
-func (c *statClient) PrepareDir(prefixes ...string) (*adapter.StatDir, error) {
+func (c *statClient) PrepareDir(_ ...string) (*adapter.StatDir, error) {
+ return nil, adapter.ErrNotImplemented
+}
+
+func (c *statClient) PrepareDirOnIndex(_ ...uint32) (*adapter.StatDir, error) {
return nil, adapter.ErrNotImplemented
}
-func (c *statClient) UpdateDir(dir *adapter.StatDir) error {
+func (c *statClient) UpdateDir(_ *adapter.StatDir) error {
return adapter.ErrNotImplemented
}
diff --git a/adapter/vppapiclient/stat_client_stub.go b/adapter/vppapiclient/stat_client_stub.go
index c764391..3aa9cc5 100644
--- a/adapter/vppapiclient/stat_client_stub.go
+++ b/adapter/vppapiclient/stat_client_stub.go
@@ -17,7 +17,7 @@
package vppapiclient
import (
- "git.fd.io/govpp.git/adapter"
+ "go.fd.io/govpp/adapter"
)
// stubStatClient is just an stub adapter that does nothing. It builds only on Windows and OSX, where the real
@@ -36,7 +36,7 @@ func (*stubStatClient) Disconnect() error {
return nil
}
-func (*stubStatClient) ListStats(patterns ...string) (statNames []string, err error) {
+func (*stubStatClient) ListStats(patterns ...string) (indexes []adapter.StatIdentifier, err error) {
return nil, adapter.ErrNotImplemented
}
@@ -48,6 +48,10 @@ func (*stubStatClient) PrepareDir(prefixes ...string) (*adapter.StatDir, error)
return nil, adapter.ErrNotImplemented
}
+func (*stubStatClient) PrepareDirOnIndex(indexes ...uint32) (*adapter.StatDir, error) {
+ return nil, adapter.ErrNotImplemented
+}
+
func (*stubStatClient) UpdateDir(dir *adapter.StatDir) error {
return adapter.ErrNotImplemented
}
diff --git a/adapter/vppapiclient/vppapiclient.go b/adapter/vppapiclient/vppapiclient.go
index 977f32d..3ab460d 100644
--- a/adapter/vppapiclient/vppapiclient.go
+++ b/adapter/vppapiclient/vppapiclient.go
@@ -29,12 +29,13 @@ import (
"os"
"path/filepath"
"reflect"
+ "sync/atomic"
"time"
"unsafe"
"github.com/fsnotify/fsnotify"
- "git.fd.io/govpp.git/adapter"
+ "go.fd.io/govpp/adapter"
)
var (
@@ -52,7 +53,7 @@ const (
// global VPP binary API client, library vppapiclient only supports
// single connection at a time
-var globalVppClient *vppClient
+var globalVppClient unsafe.Pointer
// stubVppClient is the default implementation of the VppAPI.
type vppClient struct {
@@ -76,7 +77,8 @@ func NewVppClientWithInputQueueSize(shmPrefix string, inputQueueSize uint16) ada
// Connect connects the process to VPP.
func (a *vppClient) Connect() error {
- if globalVppClient != nil {
+ h := (*vppClient)(atomic.LoadPointer(&globalVppClient))
+ if h != nil {
return fmt.Errorf("already connected to binary API, disconnect first")
}
@@ -92,19 +94,17 @@ func (a *vppClient) Connect() error {
return fmt.Errorf("connecting to VPP binary API failed (rc=%v)", rc)
}
- globalVppClient = a
+ atomic.StorePointer(&globalVppClient, unsafe.Pointer(a))
return nil
}
// Disconnect disconnects the process from VPP.
func (a *vppClient) Disconnect() error {
- globalVppClient = nil
-
+ atomic.StorePointer(&globalVppClient, nil)
rc := C.govpp_disconnect()
if rc != 0 {
return fmt.Errorf("disconnecting from VPP binary API failed (rc=%v)", rc)
}
-
return nil
}
@@ -187,9 +187,12 @@ func (a *vppClient) WaitReady() error {
//export go_msg_callback
func go_msg_callback(msgID C.uint16_t, data unsafe.Pointer, size C.size_t) {
+ h := (*vppClient)(atomic.LoadPointer(&globalVppClient))
+ if h == nil {
+ return
+ }
// convert unsafe.Pointer to byte slice
sliceHeader := &reflect.SliceHeader{Data: uintptr(data), Len: int(size), Cap: int(size)}
byteSlice := *(*[]byte)(unsafe.Pointer(sliceHeader))
-
- globalVppClient.msgCallback(uint16(msgID), byteSlice)
+ h.msgCallback(uint16(msgID), byteSlice)
}
diff --git a/adapter/vppapiclient/vppapiclient_stub.go b/adapter/vppapiclient/vppapiclient_stub.go
index 20ad12b..57b71c3 100644
--- a/adapter/vppapiclient/vppapiclient_stub.go
+++ b/adapter/vppapiclient/vppapiclient_stub.go
@@ -17,7 +17,7 @@
package vppapiclient
import (
- "git.fd.io/govpp.git/adapter"
+ "go.fd.io/govpp/adapter"
)
// stubVppClient is just an stub adapter that does nothing. It builds only on Windows and OSX, where the real