diff options
Diffstat (limited to 'adapter')
-rw-r--r-- | adapter/mock/mock_stats_adapter.go | 15 | ||||
-rw-r--r-- | adapter/mock/mock_vpp_adapter.go | 41 | ||||
-rw-r--r-- | adapter/socketclient/socketclient.go | 125 | ||||
-rw-r--r-- | adapter/stats_api.go | 119 | ||||
-rw-r--r-- | adapter/statsclient/stat_segment.go | 470 | ||||
-rw-r--r-- | adapter/statsclient/stat_segment_api.go | 137 | ||||
-rw-r--r-- | adapter/statsclient/statsclient.go | 598 | ||||
-rw-r--r-- | adapter/statsclient/statseg.go | 104 | ||||
-rw-r--r-- | adapter/statsclient/statseg_v1.go | 365 | ||||
-rw-r--r-- | adapter/statsclient/statseg_v2.go | 411 | ||||
-rw-r--r-- | adapter/vppapiclient/stat_client.go | 30 | ||||
-rw-r--r-- | adapter/vppapiclient/stat_client_stub.go | 8 | ||||
-rw-r--r-- | adapter/vppapiclient/vppapiclient.go | 21 | ||||
-rw-r--r-- | adapter/vppapiclient/vppapiclient_stub.go | 2 |
14 files changed, 1537 insertions, 909 deletions
diff --git a/adapter/mock/mock_stats_adapter.go b/adapter/mock/mock_stats_adapter.go index 55b1831..f2378f3 100644 --- a/adapter/mock/mock_stats_adapter.go +++ b/adapter/mock/mock_stats_adapter.go @@ -18,7 +18,7 @@ package mock import ( - "git.fd.io/govpp.git/adapter" + "go.fd.io/govpp/adapter" ) // implements StatsAPI @@ -46,10 +46,13 @@ func (a *StatsAdapter) Disconnect() error { } // ListStats mocks name listing for all stats. -func (a *StatsAdapter) ListStats(patterns ...string) ([]string, error) { - var statNames []string +func (a *StatsAdapter) ListStats(patterns ...string) ([]adapter.StatIdentifier, error) { + var statNames []adapter.StatIdentifier for _, stat := range a.entries { - statNames = append(statNames, string(stat.Name)) + statNames = append(statNames, adapter.StatIdentifier{ + Name: stat.Name, + Index: stat.Index, + }) } return statNames, nil } @@ -63,6 +66,10 @@ func (a *StatsAdapter) PrepareDir(prefixes ...string) (*adapter.StatDir, error) return a.dir, nil } +func (a *StatsAdapter) PrepareDirOnIndex(indexes ...uint32) (*adapter.StatDir, error) { + return a.dir, nil +} + func (a *StatsAdapter) UpdateDir(dir *adapter.StatDir) error { *dir = *a.dir return nil diff --git a/adapter/mock/mock_vpp_adapter.go b/adapter/mock/mock_vpp_adapter.go index b7fa002..cb37dd2 100644 --- a/adapter/mock/mock_vpp_adapter.go +++ b/adapter/mock/mock_vpp_adapter.go @@ -22,10 +22,10 @@ import ( "reflect" "sync" - "git.fd.io/govpp.git/adapter" - "git.fd.io/govpp.git/adapter/mock/binapi" - "git.fd.io/govpp.git/api" - "git.fd.io/govpp.git/codec" + "go.fd.io/govpp/adapter" + "go.fd.io/govpp/adapter/mock/binapi" + "go.fd.io/govpp/api" + "go.fd.io/govpp/codec" ) type replyMode int @@ -44,7 +44,7 @@ type VppAdapter struct { access sync.RWMutex msgNameToIds map[string]uint16 msgIDsToName map[uint16]string - binAPITypes map[string]reflect.Type + binAPITypes map[string]map[string]reflect.Type repliesLock sync.Mutex // mutex for the queue replies []reply // FIFO queue of messages @@ -126,7 +126,7 @@ func NewVppAdapter() *VppAdapter { msgIDSeq: 1000, msgIDsToName: make(map[uint16]string), msgNameToIds: make(map[string]uint16), - binAPITypes: make(map[string]reflect.Type), + binAPITypes: make(map[string]map[string]reflect.Type), } a.registerBinAPITypes() return a @@ -165,19 +165,25 @@ func (a *VppAdapter) GetMsgNameByID(msgID uint16) (string, bool) { func (a *VppAdapter) registerBinAPITypes() { a.access.Lock() defer a.access.Unlock() - for _, msg := range api.GetRegisteredMessages() { - a.binAPITypes[msg.GetMessageName()] = reflect.TypeOf(msg).Elem() + for pkg, msgs := range api.GetRegisteredMessages() { + msgMap := make(map[string]reflect.Type) + for _, msg := range msgs { + msgMap[msg.GetMessageName()] = reflect.TypeOf(msg).Elem() + } + a.binAPITypes[pkg] = msgMap } } // ReplyTypeFor returns reply message type for given request message name. -func (a *VppAdapter) ReplyTypeFor(requestMsgName string) (reflect.Type, uint16, bool) { +func (a *VppAdapter) ReplyTypeFor(pkg, requestMsgName string) (reflect.Type, uint16, bool) { replyName, foundName := binapi.ReplyNameFor(requestMsgName) if foundName { - if reply, found := a.binAPITypes[replyName]; found { - msgID, err := a.GetMsgID(replyName, "") - if err == nil { - return reply, msgID, found + if messages, found := a.binAPITypes[pkg]; found { + if reply, found := messages[replyName]; found { + msgID, err := a.GetMsgID(replyName, "") + if err == nil { + return reply, msgID, found + } } } } @@ -186,8 +192,8 @@ func (a *VppAdapter) ReplyTypeFor(requestMsgName string) (reflect.Type, uint16, } // ReplyFor returns reply message for given request message name. -func (a *VppAdapter) ReplyFor(requestMsgName string) (api.Message, uint16, bool) { - replType, msgID, foundReplType := a.ReplyTypeFor(requestMsgName) +func (a *VppAdapter) ReplyFor(pkg, requestMsgName string) (api.Message, uint16, bool) { + replType, msgID, foundReplType := a.ReplyTypeFor(pkg, requestMsgName) if foundReplType { msgVal := reflect.New(replType) if msg, ok := msgVal.Interface().(api.Message); ok { @@ -252,7 +258,10 @@ func (a *VppAdapter) GetMsgID(msgName string, msgCrc string) (uint16, error) { // SendMsg emulates sending a binary-encoded message to VPP. func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { - switch a.mode { + a.repliesLock.Lock() + mode := a.mode + a.repliesLock.Unlock() + switch mode { case useReplyHandlers: for i := len(a.replyHandlers) - 1; i >= 0; i-- { replyHandler := a.replyHandlers[i] diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go index 637bd69..c9aa2b4 100644 --- a/adapter/socketclient/socketclient.go +++ b/adapter/socketclient/socketclient.go @@ -30,9 +30,9 @@ import ( "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" - "git.fd.io/govpp.git/adapter" - "git.fd.io/govpp.git/binapi/memclnt" - "git.fd.io/govpp.git/codec" + "go.fd.io/govpp/adapter" + "go.fd.io/govpp/binapi/memclnt" + "go.fd.io/govpp/codec" ) const ( @@ -73,29 +73,8 @@ func init() { log = logger.WithField("logger", "govpp/socketclient") } -const socketMissing = ` ------------------------------------------------------------- - No socket file found at: %s - VPP binary API socket file is missing! - - - is VPP running with socket for binapi enabled? - - is the correct socket name configured? - - To enable it add following section to your VPP config: - socksvr { - default - } ------------------------------------------------------------- -` - -var warnOnce sync.Once - -func (c *socketClient) printMissingSocketMsg() { - fmt.Fprintf(os.Stderr, socketMissing, c.sockAddr) -} - -type socketClient struct { - sockAddr string +type Client struct { + socketPath string clientName string conn *net.UnixConn @@ -117,12 +96,14 @@ type socketClient struct { wg sync.WaitGroup } -func NewVppClient(sockAddr string) *socketClient { - if sockAddr == "" { - sockAddr = DefaultSocketName +// NewVppClient returns a new Client using socket. +// If socket is empty string DefaultSocketName is used. +func NewVppClient(socket string) *Client { + if socket == "" { + socket = DefaultSocketName } - return &socketClient{ - sockAddr: sockAddr, + return &Client{ + socketPath: socket, clientName: DefaultClientName, connectTimeout: DefaultConnectTimeout, disconnectTimeout: DefaultDisconnectTimeout, @@ -136,61 +117,34 @@ func NewVppClient(sockAddr string) *socketClient { } // SetClientName sets a client name used for identification. -func (c *socketClient) SetClientName(name string) { +func (c *Client) SetClientName(name string) { c.clientName = name } // SetConnectTimeout sets timeout used during connecting. -func (c *socketClient) SetConnectTimeout(t time.Duration) { +func (c *Client) SetConnectTimeout(t time.Duration) { c.connectTimeout = t } // SetDisconnectTimeout sets timeout used during disconnecting. -func (c *socketClient) SetDisconnectTimeout(t time.Duration) { +func (c *Client) SetDisconnectTimeout(t time.Duration) { c.disconnectTimeout = t } -func (c *socketClient) SetMsgCallback(cb adapter.MsgCallback) { +func (c *Client) SetMsgCallback(cb adapter.MsgCallback) { log.Debug("SetMsgCallback") c.msgCallback = cb } -const legacySocketName = "/run/vpp-api.sock" - -func (c *socketClient) checkLegacySocket() bool { - if c.sockAddr == legacySocketName { - return false - } - log.Debugf("checking legacy socket: %s", legacySocketName) - // check if socket exists - if _, err := os.Stat(c.sockAddr); err == nil { - return false // socket exists - } else if !os.IsNotExist(err) { - return false // some other error occurred - } - // check if legacy socket exists - if _, err := os.Stat(legacySocketName); err == nil { - // legacy socket exists, update sockAddr - c.sockAddr = legacySocketName - return true - } - // no socket socket found - return false -} - // WaitReady checks socket file existence and waits for it if necessary -func (c *socketClient) WaitReady() error { +func (c *Client) WaitReady() error { // check if socket already exists - if _, err := os.Stat(c.sockAddr); err == nil { + if _, err := os.Stat(c.socketPath); err == nil { return nil // socket exists, we are ready } else if !os.IsNotExist(err) { return err // some other error occurred } - if c.checkLegacySocket() { - return nil - } - // socket does not exist, watch for it watcher, err := fsnotify.NewWatcher() if err != nil { @@ -203,7 +157,7 @@ func (c *socketClient) WaitReady() error { }() // start directory watcher - if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil { + if err := watcher.Add(filepath.Dir(c.socketPath)); err != nil { return err } @@ -211,17 +165,14 @@ func (c *socketClient) WaitReady() error { for { select { case <-timeout.C: - if c.checkLegacySocket() { - return nil - } - return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr) + return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.socketPath) case e := <-watcher.Errors: return e case ev := <-watcher.Events: log.Debugf("watcher event: %+v", ev) - if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create { + if ev.Name == c.socketPath && (ev.Op&fsnotify.Create) == fsnotify.Create { // socket created, we are ready return nil } @@ -229,18 +180,15 @@ func (c *socketClient) WaitReady() error { } } -func (c *socketClient) Connect() error { - c.checkLegacySocket() - +func (c *Client) Connect() error { // check if socket exists - if _, err := os.Stat(c.sockAddr); os.IsNotExist(err) { - warnOnce.Do(c.printMissingSocketMsg) - return fmt.Errorf("VPP API socket file %s does not exist", c.sockAddr) + if _, err := os.Stat(c.socketPath); os.IsNotExist(err) { + return fmt.Errorf("VPP API socket file %s does not exist", c.socketPath) } else if err != nil { return fmt.Errorf("VPP API socket error: %v", err) } - if err := c.connect(c.sockAddr); err != nil { + if err := c.connect(c.socketPath); err != nil { return err } @@ -256,7 +204,7 @@ func (c *socketClient) Connect() error { return nil } -func (c *socketClient) Disconnect() error { +func (c *Client) Disconnect() error { if c.conn == nil { return nil } @@ -273,7 +221,6 @@ func (c *socketClient) Disconnect() error { // Don't bother sending a vl_api_sockclnt_delete_t message, // just close the socket. - if err := c.disconnect(); err != nil { return err } @@ -283,10 +230,10 @@ func (c *socketClient) Disconnect() error { const defaultBufferSize = 4096 -func (c *socketClient) connect(sockAddr string) error { +func (c *Client) connect(sockAddr string) error { addr := &net.UnixAddr{Name: sockAddr, Net: "unix"} - log.Debugf("Connecting to: %v", c.sockAddr) + log.Debugf("Connecting to: %v", c.socketPath) conn, err := net.DialUnix("unix", nil, addr) if err != nil { @@ -311,7 +258,7 @@ func (c *socketClient) connect(sockAddr string) error { return nil } -func (c *socketClient) disconnect() error { +func (c *Client) disconnect() error { log.Debugf("Closing socket") if err := c.conn.Close(); err != nil { log.Debugln("Closing socket failed:", err) @@ -326,7 +273,7 @@ const ( deleteMsgContext = byte(124) ) -func (c *socketClient) open() error { +func (c *Client) open() error { var msgCodec = codec.DefaultCodec // Request socket client create @@ -379,7 +326,7 @@ func (c *socketClient) open() error { return nil } -func (c *socketClient) GetMsgID(msgName string, msgCrc string) (uint16, error) { +func (c *Client) GetMsgID(msgName string, msgCrc string) (uint16, error) { if msgID, ok := c.msgTable[msgName+"_"+msgCrc]; ok { return msgID, nil } @@ -389,7 +336,7 @@ func (c *socketClient) GetMsgID(msgName string, msgCrc string) (uint16, error) { } } -func (c *socketClient) SendMsg(context uint32, data []byte) error { +func (c *Client) SendMsg(context uint32, data []byte) error { if len(data) < 10 { return fmt.Errorf("invalid message data, length must be at least 10 bytes") } @@ -423,7 +370,7 @@ func setMsgRequestHeader(data []byte, clientIndex, context uint32) { binary.BigEndian.PutUint32(data[6:10], context) } -func (c *socketClient) writeMsg(msg []byte) error { +func (c *Client) writeMsg(msg []byte) error { // we lock to prevent mixing multiple message writes c.writeMu.Lock() defer c.writeMu.Unlock() @@ -482,7 +429,7 @@ func writeMsgData(w io.Writer, msg []byte, writerSize int) error { return nil } -func (c *socketClient) readerLoop() { +func (c *Client) readerLoop() { defer c.wg.Done() defer log.Debugf("reader loop done") @@ -528,7 +475,7 @@ func getMsgReplyHeader(msg []byte) (msgID uint16, context uint32) { return } -func (c *socketClient) readMsgTimeout(buf []byte, timeout time.Duration) ([]byte, error) { +func (c *Client) readMsgTimeout(buf []byte, timeout time.Duration) ([]byte, error) { // set read deadline readDeadline := time.Now().Add(timeout) if err := c.conn.SetReadDeadline(readDeadline); err != nil { @@ -549,7 +496,7 @@ func (c *socketClient) readMsgTimeout(buf []byte, timeout time.Duration) ([]byte return msgReply, nil } -func (c *socketClient) readMsg(buf []byte) ([]byte, error) { +func (c *Client) readMsg(buf []byte) ([]byte, error) { log.Debug("reading msg..") header := c.headerPool.Get().([]byte) diff --git a/adapter/stats_api.go b/adapter/stats_api.go index 90dbeb3..7dc7dc3 100644 --- a/adapter/stats_api.go +++ b/adapter/stats_api.go @@ -16,7 +16,6 @@ package adapter import ( "errors" - "fmt" ) const ( @@ -27,6 +26,7 @@ const ( var ( ErrStatsDataBusy = errors.New("stats data busy") ErrStatsDirStale = errors.New("stats dir stale") + ErrStatsDisconnected = errors.New("stats disconnected") ErrStatsAccessFailed = errors.New("stats access failed") ) @@ -37,59 +37,53 @@ type StatsAPI interface { // Disconnect terminates client connection. Disconnect() error - // ListStats lists names for stats matching patterns. - ListStats(patterns ...string) (names []string, err error) + // ListStats lists indexed names for stats matching patterns. + ListStats(patterns ...string) (indexes []StatIdentifier, err error) // DumpStats dumps all stat entries. DumpStats(patterns ...string) (entries []StatEntry, err error) // PrepareDir prepares new stat dir for entries that match any of prefixes. PrepareDir(patterns ...string) (*StatDir, error) + // PrepareDirOnIndex prepares new stat dir for entries that match any of indexes. + PrepareDirOnIndex(indexes ...uint32) (*StatDir, error) // UpdateDir updates stat dir and all of their entries. UpdateDir(dir *StatDir) error } // StatType represents type of stat directory and simply // defines what type of stat data is stored in the stat entry. -type StatType int +type StatType string const ( - _ StatType = 0 - ScalarIndex StatType = 1 - SimpleCounterVector StatType = 2 - CombinedCounterVector StatType = 3 - ErrorIndex StatType = 4 - NameVector StatType = 5 + Unknown StatType = "UnknownStatType" + ScalarIndex StatType = "ScalarIndex" + SimpleCounterVector StatType = "SimpleCounterVector" + CombinedCounterVector StatType = "CombinedCounterVector" + ErrorIndex StatType = "ErrorIndex" + NameVector StatType = "NameVector" + Empty StatType = "Empty" + Symlink StatType = "Symlink" ) -func (d StatType) String() string { - switch d { - case ScalarIndex: - return "ScalarIndex" - case SimpleCounterVector: - return "SimpleCounterVector" - case CombinedCounterVector: - return "CombinedCounterVector" - case ErrorIndex: - return "ErrorIndex" - case NameVector: - return "NameVector" - } - return fmt.Sprintf("UnknownStatType(%d)", d) -} - // StatDir defines directory of stats entries created by PrepareDir. type StatDir struct { Epoch int64 - Indexes []uint32 Entries []StatEntry } +// StatIdentifier holds a stat entry name and index +type StatIdentifier struct { + Index uint32 + Name []byte +} + // StatEntry represents single stat entry. The type of stat stored in Data // is defined by Type. type StatEntry struct { - Name []byte - Type StatType - Data Stat + StatIdentifier + Type StatType + Data Stat + Symlink bool } // Counter represents simple counter with single value, which is usually packet count. @@ -99,11 +93,11 @@ type Counter uint64 type CombinedCounter [2]uint64 func (s CombinedCounter) Packets() uint64 { - return uint64(s[0]) + return s[0] } func (s CombinedCounter) Bytes() uint64 { - return uint64(s[1]) + return s[1] } // Name represents string value stored under name vector. @@ -118,6 +112,9 @@ type Stat interface { // IsZero returns true if all of its values equal to zero. IsZero() bool + // Type returns underlying type of a stat + Type() StatType + // isStat is intentionally unexported to limit implementations of interface to this package, isStat() } @@ -125,16 +122,16 @@ type Stat interface { // ScalarStat represents stat for ScalarIndex. type ScalarStat float64 -// ErrorStat represents stat for ErrorIndex. -type ErrorStat Counter +// ErrorStat represents stat for ErrorIndex. The array represents workers. +type ErrorStat []Counter -// SimpleCounterStat represents stat for SimpleCounterVector. +// SimpleCounterStat represents indexed stat for SimpleCounterVector. // The outer array represents workers and the inner array represents interface/node/.. indexes. // Values should be aggregated per interface/node for every worker. // ReduceSimpleCounterStatIndex can be used to reduce specific index. type SimpleCounterStat [][]Counter -// CombinedCounterStat represents stat for CombinedCounterVector. +// CombinedCounterStat represents indexed stat for CombinedCounterVector. // The outer array represents workers and the inner array represents interface/node/.. indexes. // Values should be aggregated per interface/node for every worker. // ReduceCombinedCounterStatIndex can be used to reduce specific index. @@ -143,18 +140,40 @@ type CombinedCounterStat [][]CombinedCounter // NameStat represents stat for NameVector. type NameStat []Name +// EmptyStat represents removed counter directory +type EmptyStat string + func (ScalarStat) isStat() {} func (ErrorStat) isStat() {} func (SimpleCounterStat) isStat() {} func (CombinedCounterStat) isStat() {} func (NameStat) isStat() {} +func (EmptyStat) isStat() {} func (s ScalarStat) IsZero() bool { return s == 0 } + +func (s ScalarStat) Type() StatType { + return ScalarIndex +} + func (s ErrorStat) IsZero() bool { - return s == 0 + if s == nil { + return true + } + for _, ss := range s { + if ss != 0 { + return false + } + } + return true +} + +func (s ErrorStat) Type() StatType { + return ErrorIndex } + func (s SimpleCounterStat) IsZero() bool { if s == nil { return true @@ -168,6 +187,11 @@ func (s SimpleCounterStat) IsZero() bool { } return true } + +func (s SimpleCounterStat) Type() StatType { + return SimpleCounterVector +} + func (s CombinedCounterStat) IsZero() bool { if s == nil { return true @@ -184,6 +208,11 @@ func (s CombinedCounterStat) IsZero() bool { } return true } + +func (s CombinedCounterStat) Type() StatType { + return CombinedCounterVector +} + func (s NameStat) IsZero() bool { if s == nil { return true @@ -196,6 +225,18 @@ func (s NameStat) IsZero() bool { return true } +func (s NameStat) Type() StatType { + return NameVector +} + +func (s EmptyStat) IsZero() bool { + return true +} + +func (s EmptyStat) Type() StatType { + return Empty +} + // ReduceSimpleCounterStatIndex returns reduced SimpleCounterStat s for index i. func ReduceSimpleCounterStatIndex(s SimpleCounterStat, i int) uint64 { var val uint64 @@ -209,8 +250,8 @@ func ReduceSimpleCounterStatIndex(s SimpleCounterStat, i int) uint64 { func ReduceCombinedCounterStatIndex(s CombinedCounterStat, i int) [2]uint64 { var val [2]uint64 for _, w := range s { - val[0] += uint64(w[i][0]) - val[1] += uint64(w[i][1]) + val[0] += w[i][0] + val[1] += w[i][1] } return val } diff --git a/adapter/statsclient/stat_segment.go b/adapter/statsclient/stat_segment.go deleted file mode 100644 index 2a97fd4..0000000 --- a/adapter/statsclient/stat_segment.go +++ /dev/null @@ -1,470 +0,0 @@ -// Copyright (c) 2019 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package statsclient - -import ( - "fmt" - "net" - "syscall" - "unsafe" - - "github.com/ftrvxmtrx/fd" - - "git.fd.io/govpp.git/adapter" -) - -var ( - ErrStatDataLenIncorrect = fmt.Errorf("stat data length incorrect") -) - -const ( - minVersion = 0 - maxVersion = 1 -) - -func checkVersion(ver uint64) error { - if ver < minVersion { - return fmt.Errorf("stat segment version is too old: %v (minimal version: %v)", ver, minVersion) - } else if ver > maxVersion { - return fmt.Errorf("stat segment version is not supported: %v (minimal version: %v)", ver, maxVersion) - } - return nil -} - -type statSegment struct { - sharedHeader []byte - memorySize int64 - - // legacyVersion represents stat segment version 0 - // and is used as fallback for VPP 19.04 - legacyVersion bool -} - -func (c *statSegment) getHeader() (header sharedHeader) { - if c.legacyVersion { - return loadSharedHeaderLegacy(c.sharedHeader) - } - return loadSharedHeader(c.sharedHeader) -} - -func (c *statSegment) getEpoch() (int64, bool) { - h := c.getHeader() - return h.epoch, h.inProgress != 0 -} - -func (c *statSegment) getOffsets() (dir, err, stat int64) { - h := c.getHeader() - return h.directoryOffset, h.errorOffset, h.statsOffset -} - -func (c *statSegment) connect(sockName string) error { - if c.sharedHeader != nil { - return fmt.Errorf("already connected") - } - - addr := net.UnixAddr{ - Net: "unixpacket", - Name: sockName, - } - Log.Debugf("connecting to: %v", addr) - - conn, err := net.DialUnix(addr.Net, nil, &addr) - if err != nil { - Log.Warnf("connecting to socket %s failed: %s", addr, err) - return err - } - defer func() { - if err := conn.Close(); err != nil { - Log.Warnf("closing socket failed: %v", err) - } - }() - - Log.Debugf("connected to socket") - - files, err := fd.Get(conn, 1, nil) - if err != nil { - return fmt.Errorf("getting file descriptor over socket failed: %v", err) - } - if len(files) == 0 { - return fmt.Errorf("no files received over socket") - } - - file := files[0] - defer func() { - if err := file.Close(); err != nil { - Log.Warnf("closing file failed: %v", err) - } - }() - - info, err := file.Stat() - if err != nil { - return err - } - size := info.Size() - - data, err := syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) - if err != nil { - Log.Debugf("mapping shared memory failed: %v", err) - return fmt.Errorf("mapping shared memory failed: %v", err) - } - - Log.Debugf("successfuly mmapped shared memory segment (size: %v) %v", size, len(data)) - - c.sharedHeader = data - c.memorySize = size - - hdr := loadSharedHeader(c.sharedHeader) - Log.Debugf("stat segment header: %+v", hdr) - - if hdr.legacyVersion() { - c.legacyVersion = true - hdr = loadSharedHeaderLegacy(c.sharedHeader) - Log.Debugf("falling back to legacy version (VPP <=19.04) of stat segment (header: %+v)", hdr) - } - - if err := checkVersion(hdr.version); err != nil { - return err - } - - return nil -} - -func (c *statSegment) disconnect() error { - if c.sharedHeader == nil { - return nil - } - - if err := syscall.Munmap(c.sharedHeader); err != nil { - Log.Debugf("unmapping shared memory failed: %v", err) - return fmt.Errorf("unmapping shared memory failed: %v", err) - } - c.sharedHeader = nil - - Log.Debugf("successfuly unmapped shared memory") - return nil -} - -type statDirectoryType int32 - -const ( - statDirIllegal = 0 - statDirScalarIndex = 1 - statDirCounterVectorSimple = 2 - statDirCounterVectorCombined = 3 - statDirErrorIndex = 4 - statDirNameVector = 5 - statDirEmpty = 6 -) - -func (t statDirectoryType) String() string { - return adapter.StatType(t).String() -} - -type statSegDirectoryEntry struct { - directoryType statDirectoryType - // unionData can represent: - // - offset - // - index - // - value - unionData uint64 - offsetVector uint64 - name [128]byte -} - -func (c *statSegment) getStatDirVector() unsafe.Pointer { - dirOffset, _, _ := c.getOffsets() - return unsafe.Pointer(&c.sharedHeader[dirOffset]) -} - -func (c *statSegment) getStatDirIndex(p unsafe.Pointer, index uint32) *statSegDirectoryEntry { - return (*statSegDirectoryEntry)(unsafe.Pointer(uintptr(p) + uintptr(index)*unsafe.Sizeof(statSegDirectoryEntry{}))) -} - -func (c *statSegment) copyEntryData(dirEntry *statSegDirectoryEntry) adapter.Stat { - dirType := adapter.StatType(dirEntry.directoryType) - - switch dirType { - case statDirScalarIndex: - return adapter.ScalarStat(dirEntry.unionData) - - case statDirErrorIndex: - if dirEntry.unionData == 0 { - debugf("offset invalid for %s", dirEntry.name) - break - } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - debugf("offset out of range for %s", dirEntry.name) - break - } - - _, errOffset, _ := c.getOffsets() - offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset]) - - var errData adapter.Counter - if c.legacyVersion { - // error were not vector (per-worker) in VPP 19.04 - offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0)) - val := *(*adapter.Counter)(statSegPointer(offsetVector, offset)) - errData = val - } else { - vecLen := uint32(vectorLen(offsetVector)) - - for i := uint32(0); i < vecLen; i++ { - cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) - offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0)) - debugf("error index, cb: %d, offset: %d", cb, offset) - val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset)) - errData += val - } - } - return adapter.ErrorStat(errData) - - case statDirCounterVectorSimple: - if dirEntry.unionData == 0 { - debugf("offset invalid for %s", dirEntry.name) - break - } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - debugf("offset out of range for %s", dirEntry.name) - break - } - - vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]))) - offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) - - data := make([][]adapter.Counter, vecLen) - for i := uint32(0); i < vecLen; i++ { - cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) - counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) - vecLen2 := uint32(vectorLen(counterVec)) - data[i] = make([]adapter.Counter, vecLen2) - for j := uint32(0); j < vecLen2; j++ { - offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) - val := *(*adapter.Counter)(statSegPointer(counterVec, offset)) - data[i][j] = val - } - } - return adapter.SimpleCounterStat(data) - - case statDirCounterVectorCombined: - if dirEntry.unionData == 0 { - debugf("offset invalid for %s", dirEntry.name) - break - } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - debugf("offset out of range for %s", dirEntry.name) - break - } - - vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]))) - offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) - - data := make([][]adapter.CombinedCounter, vecLen) - for i := uint32(0); i < vecLen; i++ { - cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) - counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) - vecLen2 := uint32(vectorLen(counterVec)) - data[i] = make([]adapter.CombinedCounter, vecLen2) - for j := uint32(0); j < vecLen2; j++ { - offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) - val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset)) - data[i][j] = val - } - } - return adapter.CombinedCounterStat(data) - - case statDirNameVector: - if dirEntry.unionData == 0 { - debugf("offset invalid for %s", dirEntry.name) - break - } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - debugf("offset out of range for %s", dirEntry.name) - break - } - - vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]))) - offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) - - data := make([]adapter.Name, vecLen) - for i := uint32(0); i < vecLen; i++ { - cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) - if cb == 0 { - debugf("name vector out of range for %s (%v)", dirEntry.name, i) - continue - } - nameVec := unsafe.Pointer(&c.sharedHeader[cb]) - vecLen2 := uint32(vectorLen(nameVec)) - - nameStr := make([]byte, 0, vecLen2) - for j := uint32(0); j < vecLen2; j++ { - offset := uintptr(j) * unsafe.Sizeof(byte(0)) - val := *(*byte)(statSegPointer(nameVec, offset)) - if val > 0 { - nameStr = append(nameStr, val) - } - } - data[i] = adapter.Name(nameStr) - } - return adapter.NameStat(data) - - case statDirEmpty: - // no-op - - default: - // TODO: monitor occurrences with metrics - debugf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name) - } - return nil -} - -func (c *statSegment) updateEntryData(dirEntry *statSegDirectoryEntry, stat *adapter.Stat) error { - switch (*stat).(type) { - case adapter.ScalarStat: - *stat = adapter.ScalarStat(dirEntry.unionData) - - case adapter.ErrorStat: - if dirEntry.unionData == 0 { - debugf("offset invalid for %s", dirEntry.name) - break - } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - debugf("offset out of range for %s", dirEntry.name) - break - } - - _, errOffset, _ := c.getOffsets() - offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset]) - - var errData adapter.Counter - if c.legacyVersion { - // error were not vector (per-worker) in VPP 19.04 - offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0)) - val := *(*adapter.Counter)(statSegPointer(offsetVector, offset)) - errData = val - } else { - vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[errOffset]))) - - for i := uint32(0); i < vecLen; i++ { - cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) - offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0)) - val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset)) - errData += val - } - } - *stat = adapter.ErrorStat(errData) - - case adapter.SimpleCounterStat: - if dirEntry.unionData == 0 { - debugf("offset invalid for %s", dirEntry.name) - break - } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - debugf("offset out of range for %s", dirEntry.name) - break - } - - vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]))) - offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) - - data := (*stat).(adapter.SimpleCounterStat) - if uint32(len(data)) != vecLen { - return ErrStatDataLenIncorrect - } - for i := uint32(0); i < vecLen; i++ { - cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) - counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) - vecLen2 := uint32(vectorLen(counterVec)) - simpData := data[i] - if uint32(len(simpData)) != vecLen2 { - return ErrStatDataLenIncorrect - } - for j := uint32(0); j < vecLen2; j++ { - offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) - val := *(*adapter.Counter)(statSegPointer(counterVec, offset)) - simpData[j] = val - } - } - - case adapter.CombinedCounterStat: - if dirEntry.unionData == 0 { - debugf("offset invalid for %s", dirEntry.name) - break - } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - debugf("offset out of range for %s", dirEntry.name) - break - } - - vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]))) - offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) - - data := (*stat).(adapter.CombinedCounterStat) - if uint32(len(data)) != vecLen { - return ErrStatDataLenIncorrect - } - for i := uint32(0); i < vecLen; i++ { - cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) - counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) - vecLen2 := uint32(vectorLen(counterVec)) - combData := data[i] - if uint32(len(combData)) != vecLen2 { - return ErrStatDataLenIncorrect - } - for j := uint32(0); j < vecLen2; j++ { - offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) - val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset)) - combData[j] = val - } - } - - case adapter.NameStat: - if dirEntry.unionData == 0 { - debugf("offset invalid for %s", dirEntry.name) - break - } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - debugf("offset out of range for %s", dirEntry.name) - break - } - - vecLen := uint32(vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]))) - offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) - - data := (*stat).(adapter.NameStat) - if uint32(len(data)) != vecLen { - return ErrStatDataLenIncorrect - } - for i := uint32(0); i < vecLen; i++ { - cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) - if cb == 0 { - continue - } - nameVec := unsafe.Pointer(&c.sharedHeader[cb]) - vecLen2 := uint32(vectorLen(nameVec)) - - nameData := data[i] - if uint32(len(nameData))+1 != vecLen2 { - return ErrStatDataLenIncorrect - } - for j := uint32(0); j < vecLen2; j++ { - offset := uintptr(j) * unsafe.Sizeof(byte(0)) - val := *(*byte)(statSegPointer(nameVec, offset)) - if val == 0 { - break - } - nameData[j] = val - } - } - - default: - if Debug { - Log.Debugf("Unrecognized stat type %T for stat entry: %v", stat, dirEntry.name) - } - } - return nil -} diff --git a/adapter/statsclient/stat_segment_api.go b/adapter/statsclient/stat_segment_api.go new file mode 100644 index 0000000..2c2950f --- /dev/null +++ b/adapter/statsclient/stat_segment_api.go @@ -0,0 +1,137 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsclient + +import ( + "fmt" + "sync/atomic" + "time" + "unsafe" + + "go.fd.io/govpp/adapter" +) + +var ( + // ErrStatDataLenIncorrect is returned when stat data does not match vector + // length of a respective data directory + ErrStatDataLenIncorrect = fmt.Errorf("stat data length incorrect") +) + +var ( + MaxWaitInProgress = time.Millisecond * 100 + CheckDelayInProgress = time.Microsecond * 10 +) + +const ( + minVersion = 1 + maxVersion = 2 +) + +var dirTypeMapping = map[dirType]adapter.StatType{ + 1: adapter.ScalarIndex, + 2: adapter.SimpleCounterVector, + 3: adapter.CombinedCounterVector, + 4: adapter.NameVector, + 5: adapter.Empty, + 6: adapter.Symlink, +} + +var dirTypeMappingLegacy = map[dirType]adapter.StatType{ + 1: adapter.ScalarIndex, + 2: adapter.SimpleCounterVector, + 3: adapter.CombinedCounterVector, + 4: adapter.ErrorIndex, + 5: adapter.NameVector, + 6: adapter.Empty, + 7: adapter.Symlink, +} + +type ( + dirVector unsafe.Pointer + dirSegment unsafe.Pointer + dirName []byte + dirType int32 +) + +// statSegment represents common API for every stats API version +type statSegment interface { + // GetDirectoryVector returns pointer to memory where the beginning + // of the data directory is located. + GetDirectoryVector() dirVector + + // GetStatDirOnIndex accepts directory vector and particular index. + // Returns pointer to the beginning of the segment. Also the directory + // name as [128]byte and the directory type is returned for easy use + // without needing to know the exact segment version. + // + // Note that if the index is equal to 0, the result pointer points to + // the same memory address as the argument. + GetStatDirOnIndex(v dirVector, index uint32) (dirSegment, dirName, adapter.StatType) + + // GetEpoch re-loads stats header and returns current epoch + //and 'inProgress' value + GetEpoch() (int64, bool) + + // CopyEntryData accepts pointer to a directory segment and returns adapter.Stat + // based on directory type populated with data. The index is an optional parameter + // (used by symlinks) returning stats for item on the given index only. + // Use ^uint32(0) as an empty index (since 0 is a valid value). + CopyEntryData(segment dirSegment, index uint32) adapter.Stat + + // UpdateEntryData accepts pointer to a directory segment with data, and stat + // segment to update + UpdateEntryData(segment dirSegment, s *adapter.Stat) error +} + +// vecHeader represents a vector header +type vecHeader struct { + length uint64 + vectorData [0]uint8 +} + +func getVersion(data []byte) uint64 { + type apiVersion struct { + value uint64 + } + header := (*apiVersion)(unsafe.Pointer(&data[0])) + version := &apiVersion{ + value: atomic.LoadUint64(&header.value), + } + debugf("stats API version loaded: %d", version.value) + return version.value +} + +func vectorLen(v dirVector) dirVector { + vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uint64(0)))) + return dirVector(&vec.length) +} + +func getStatType(dirTypeNum dirType, useLegacyMapping bool) (dirTyp adapter.StatType) { + var exists bool + if useLegacyMapping { + dirTyp, exists = dirTypeMappingLegacy[dirTypeNum] + } else { + dirTyp, exists = dirTypeMapping[dirTypeNum] + } + if exists { + return dirTyp + } + return adapter.Unknown +} + +//go:nosplit +func statSegPointer(v dirVector, offset uintptr) dirVector { + return dirVector(uintptr(v) + offset) +} diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go index 9110275..dd84897 100644 --- a/adapter/statsclient/statsclient.go +++ b/adapter/statsclient/statsclient.go @@ -18,32 +18,31 @@ package statsclient import ( "bytes" "fmt" + "net" "os" + "path/filepath" "regexp" + "sync/atomic" + "syscall" + "time" + "go.fd.io/govpp/adapter" + "github.com/fsnotify/fsnotify" + "github.com/ftrvxmtrx/fd" logger "github.com/sirupsen/logrus" - - "git.fd.io/govpp.git/adapter" ) const ( // DefaultSocketName is default VPP stats socket file path. DefaultSocketName = adapter.DefaultStatsSocket -) - -const socketMissing = ` ------------------------------------------------------------- - VPP stats socket file %s is missing! - - is VPP running with stats segment enabled? - - is the correct socket name configured? + // DefaultSocketRetryPeriod is the time period after the socket availability + // will be re-checked + DefaultSocketRetryPeriod = 50 * time.Millisecond - To enable it add following section to your VPP config: - statseg { - default - } ------------------------------------------------------------- -` + // DefaultSocketRetryTimeout is the maximum time for the stats socket + DefaultSocketRetryTimeout = 3 * time.Second +) var ( // Debug is global variable that determines debug mode @@ -73,184 +72,481 @@ var _ adapter.StatsAPI = (*StatsClient)(nil) // StatsClient is the pure Go implementation for VPP stats API. type StatsClient struct { - sockAddr string + socket string + retryPeriod time.Duration + retryTimeout time.Duration + + headerData []byte + + // defines the adapter connection state + connected uint32 + + // to quit socket monitor + done chan struct{} statSegment } -// NewStatsClient returns new VPP stats API client. -func NewStatsClient(sockAddr string) *StatsClient { - if sockAddr == "" { - sockAddr = DefaultSocketName +// Option is a StatsClient option +type Option func(*StatsClient) + +// SetSocketRetryPeriod is and optional parameter to define a custom +// retry period while waiting for the VPP socket +func SetSocketRetryPeriod(t time.Duration) Option { + return func(c *StatsClient) { + c.retryPeriod = t } - return &StatsClient{ - sockAddr: sockAddr, +} + +// SetSocketRetryTimeout is and optional parameter to define a custom +// timeout while waiting for the VPP socket +func SetSocketRetryTimeout(t time.Duration) Option { + return func(c *StatsClient) { + c.retryTimeout = t } } -func (c *StatsClient) Connect() error { - // check if socket exists - if _, err := os.Stat(c.sockAddr); os.IsNotExist(err) { - fmt.Fprintf(os.Stderr, socketMissing, c.sockAddr) - return fmt.Errorf("stats socket file %s does not exist", c.sockAddr) - } else if err != nil { - return fmt.Errorf("stats socket error: %v", err) +// NewStatsClient returns a new StatsClient using socket. +// If socket is empty string DefaultSocketName is used. +func NewStatsClient(socket string, options ...Option) *StatsClient { + if socket == "" { + socket = DefaultSocketName + } + s := &StatsClient{ + socket: socket, } + for _, option := range options { + option(s) + } + if s.retryPeriod == 0 { + s.retryPeriod = DefaultSocketRetryPeriod + } + if s.retryTimeout == 0 { + s.retryTimeout = DefaultSocketRetryTimeout + } + return s +} - if err := c.statSegment.connect(c.sockAddr); err != nil { +// Connect to validated VPP stats socket and start monitoring +// socket file changes +func (sc *StatsClient) Connect() (err error) { + if err := sc.waitForSocket(); err != nil { return err } - + sc.done = make(chan struct{}) + if sc.statSegment, err = sc.connect(); err != nil { + return err + } + sc.monitorSocket() return nil } -func (c *StatsClient) Disconnect() error { - if err := c.statSegment.disconnect(); err != nil { - return err +// Disconnect from the socket, unmap shared memory and terminate +// socket monitor +func (sc *StatsClient) Disconnect() error { + if sc.headerData == nil { + return nil + } + if err := syscall.Munmap(sc.headerData); err != nil { + Log.Debugf("unmapping shared memory failed: %v", err) + return fmt.Errorf("unmapping shared memory failed: %v", err) } + sc.headerData = nil + sc.done <- struct{}{} + + Log.Debugf("successfully unmapped shared memory") return nil } -func (c *StatsClient) ListStats(patterns ...string) (names []string, err error) { - sa := c.accessStart() - if sa.epoch == 0 { +func (sc *StatsClient) ListStats(patterns ...string) (entries []adapter.StatIdentifier, err error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } + accessEpoch := sc.accessStart() + if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed } - indexes, err := c.listIndexes(patterns...) + entries, err = sc.getIdentifierEntries(patterns...) if err != nil { return nil, err } - dirVector := c.getStatDirVector() - vecLen := uint32(vectorLen(dirVector)) - - for _, index := range indexes { - if index >= vecLen { - return nil, fmt.Errorf("stat entry index %d out of dir vector len (%d)", index, vecLen) - } - - dirEntry := c.getStatDirIndex(dirVector, index) - var name []byte - for n := 0; n < len(dirEntry.name); n++ { - if dirEntry.name[n] == 0 { - name = dirEntry.name[:n] - break - } - } - names = append(names, string(name)) - } - - if !c.accessEnd(&sa) { + if !sc.accessEnd(accessEpoch) { return nil, adapter.ErrStatsDataBusy } - - return names, nil + return entries, nil } -func (c *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) { - sa := c.accessStart() - if sa.epoch == 0 { +func (sc *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } + + accessEpoch := sc.accessStart() + if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed } - indexes, err := c.listIndexes(patterns...) + entries, err = sc.getStatEntries(patterns...) if err != nil { return nil, err } - if entries, err = c.dumpEntries(indexes); err != nil { - return nil, err - } - if !c.accessEnd(&sa) { + if !sc.accessEnd(accessEpoch) { return nil, adapter.ErrStatsDataBusy } - return entries, nil } -func (c *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) { - dir := new(adapter.StatDir) +func (sc *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } - sa := c.accessStart() - if sa.epoch == 0 { + accessEpoch := sc.accessStart() + if accessEpoch == 0 { return nil, adapter.ErrStatsAccessFailed } - indexes, err := c.listIndexes(patterns...) + entries, err := sc.getStatEntries(patterns...) if err != nil { return nil, err } - dir.Indexes = indexes - entries, err := c.dumpEntries(indexes) + if !sc.accessEnd(accessEpoch) { + return nil, adapter.ErrStatsDataBusy + } + + dir := &adapter.StatDir{ + Epoch: accessEpoch, + Entries: entries, + } + + return dir, nil +} + +func (sc *StatsClient) PrepareDirOnIndex(indexes ...uint32) (*adapter.StatDir, error) { + if !sc.isConnected() { + return nil, adapter.ErrStatsDisconnected + } + + accessEpoch := sc.accessStart() + if accessEpoch == 0 { + return nil, adapter.ErrStatsAccessFailed + } + vector := sc.GetDirectoryVector() + if vector == nil { + return nil, fmt.Errorf("failed to prepare dir on index: directory vector is nil") + } + entries, err := sc.getStatEntriesOnIndex(vector, indexes...) if err != nil { return nil, err } - dir.Entries = entries - if !c.accessEnd(&sa) { + if !sc.accessEnd(accessEpoch) { return nil, adapter.ErrStatsDataBusy } - dir.Epoch = sa.epoch + + dir := &adapter.StatDir{ + Epoch: accessEpoch, + Entries: entries, + } return dir, nil } -func (c *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { - epoch, _ := c.getEpoch() +// UpdateDir refreshes directory data for all counters +func (sc *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { + if !sc.isConnected() { + return adapter.ErrStatsDisconnected + } + epoch, _ := sc.GetEpoch() if dir.Epoch != epoch { return adapter.ErrStatsDirStale } - sa := c.accessStart() - if sa.epoch == 0 { + accessEpoch := sc.accessStart() + if accessEpoch == 0 { return adapter.ErrStatsAccessFailed } + dirVector := sc.GetDirectoryVector() + if dirVector == nil { + return err + } + for i := 0; i < len(dir.Entries); i++ { + if err := sc.updateStatOnIndex(&dir.Entries[i], dirVector); err != nil { + return err + } + } + if !sc.accessEnd(accessEpoch) { + return adapter.ErrStatsDataBusy + } + return nil +} - dirVector := c.getStatDirVector() - - for i, index := range dir.Indexes { - dirEntry := c.getStatDirIndex(dirVector, index) - - var name []byte - for n := 0; n < len(dirEntry.name); n++ { - if dirEntry.name[n] == 0 { - name = dirEntry.name[:n] - break +// checks the socket existence and waits for it for the designated +// time if it is not available immediately +func (sc *StatsClient) waitForSocket() error { + if _, err := os.Stat(sc.socket); err != nil { + if os.IsNotExist(err) { + n := time.Now() + ticker := time.NewTicker(sc.retryPeriod) + timeout := time.After(sc.retryTimeout) + for { + select { + case <-ticker.C: + if _, err := os.Stat(sc.socket); err == nil { + return nil + } + case <-timeout: + return fmt.Errorf("stats socket file %s is not ready within timeout (after %.2f s) ", + sc.socket, time.Since(n).Seconds()) + } } + } else { + return fmt.Errorf("stats socket error: %v", err) } - if len(name) == 0 { - continue - } + } + return nil +} + +// connect to the socket and map it into the memory. According to the +// header version info, an appropriate segment handler is returned +func (sc *StatsClient) connect() (ss statSegment, err error) { + addr := net.UnixAddr{ + Net: "unixpacket", + Name: sc.socket, + } + Log.Debugf("connecting to: %v", addr) - entry := &dir.Entries[i] - if !bytes.Equal(name, entry.Name) { - continue + conn, err := net.DialUnix(addr.Net, nil, &addr) + if err != nil { + Log.Warnf("connecting to socket %s failed: %s", addr, err) + return nil, err + } + defer func() { + if err := conn.Close(); err != nil { + Log.Warnf("closing socket failed: %v", err) } - if adapter.StatType(dirEntry.directoryType) != entry.Type { - continue + }() + Log.Debugf("connected to socket") + + files, err := fd.Get(conn, 1, nil) + if err != nil { + return nil, fmt.Errorf("getting file descriptor over socket failed: %v", err) + } + if len(files) == 0 { + return nil, fmt.Errorf("no files received over socket") + } + + file := files[0] + defer func() { + if err := file.Close(); err != nil { + Log.Warnf("closing file failed: %v", err) } - if entry.Data == nil { - continue + }() + + info, err := file.Stat() + if err != nil { + return nil, err + } + size := info.Size() + + sc.headerData, err = syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) + if err != nil { + Log.Debugf("mapping shared memory failed: %v", err) + return nil, fmt.Errorf("mapping shared memory failed: %v", err) + } + Log.Debugf("successfully mmapped shared memory segment (size: %v) %v", size, len(sc.headerData)) + + version := getVersion(sc.headerData) + switch version { + case 1: + ss = newStatSegmentV1(sc.headerData, size) + case 2: + ss = newStatSegmentV2(sc.headerData, size) + default: + return nil, fmt.Errorf("stat segment version is not supported: %v (min: %v, max: %v)", + version, minVersion, maxVersion) + } + + // set connected + atomic.CompareAndSwapUint32(&sc.connected, 0, 1) + + return ss, nil +} + +// reconnect disconnects from the socket, re-validates it and +// connects again +func (sc *StatsClient) reconnect() (err error) { + if err = sc.disconnect(); err != nil { + return fmt.Errorf("error disconnecting socket: %v", err) + } + if err = sc.waitForSocket(); err != nil { + return fmt.Errorf("error while waiting on socket: %v", err) + } + if sc.statSegment, err = sc.connect(); err != nil { + return fmt.Errorf("error connecting socket: %v", err) + } + return nil +} + +// disconnect unmaps socket data from the memory and resets the header +func (sc *StatsClient) disconnect() error { + if !atomic.CompareAndSwapUint32(&sc.connected, 1, 0) { + return fmt.Errorf("stats client is already disconnected") + } + if sc.headerData == nil { + return nil + } + if err := syscall.Munmap(sc.headerData); err != nil { + Log.Debugf("unmapping shared memory failed: %v", err) + return fmt.Errorf("unmapping shared memory failed: %v", err) + } + sc.headerData = nil + + Log.Debugf("successfully unmapped shared memory") + return nil +} + +func (sc *StatsClient) monitorSocket() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + Log.Errorf("error starting socket monitor: %v", err) + return + } + + go func() { + for { + select { + case event := <-watcher.Events: + if event.Op == fsnotify.Remove && event.Name == sc.socket { + if err := sc.reconnect(); err != nil { + Log.Errorf("error occurred during socket reconnect: %v", err) + } + } + case err := <-watcher.Errors: + Log.Errorf("socket monitor delivered error event: %v", err) + case <-sc.done: + err := watcher.Close() + Log.Debugf("socket monitor closed (error: %v)", err) + return + } } - if err := c.updateEntryData(dirEntry, &entry.Data); err != nil { - return fmt.Errorf("updating stat data for entry %s failed: %v", name, err) + }() + + if err := watcher.Add(filepath.Dir(sc.socket)); err != nil { + Log.Errorf("failed to add socket address to the watcher: %v", err) + } +} + +// Starts monitoring 'inProgress' field. Returns stats segment +// access epoch when completed, or zero value if not finished +// within MaxWaitInProgress +func (sc *StatsClient) accessStart() (epoch int64) { + t := time.Now() + + epoch, inProg := sc.GetEpoch() + for inProg { + if time.Since(t) > MaxWaitInProgress { + return int64(0) } + time.Sleep(CheckDelayInProgress) + epoch, inProg = sc.GetEpoch() + } + return epoch +} +// AccessEnd returns true if stats data reading was finished, false +// otherwise +func (sc *StatsClient) accessEnd(accessEpoch int64) bool { + epoch, inProgress := sc.GetEpoch() + if accessEpoch != epoch || inProgress { + return false } + return true +} - if !c.accessEnd(&sa) { - return adapter.ErrStatsDataBusy +// getStatEntries retrieves all stats matching desired patterns, or all stats if no pattern is provided. +func (sc *StatsClient) getStatEntries(patterns ...string) (entries []adapter.StatEntry, err error) { + vector := sc.GetDirectoryVector() + if vector == nil { + return nil, fmt.Errorf("failed to get stat entries: directory vector is nil") } + indexes, err := sc.listIndexes(vector, patterns...) + if err != nil { + return nil, err + } + return sc.getStatEntriesOnIndex(vector, indexes...) +} - return nil +// getIdentifierEntries retrieves all identifiers matching desired patterns, or all identifiers +// if no pattern is provided. +func (sc *StatsClient) getIdentifierEntries(patterns ...string) (identifiers []adapter.StatIdentifier, err error) { + vector := sc.GetDirectoryVector() + if vector == nil { + return nil, fmt.Errorf("failed to get identifier entries: directory vector is nil") + } + indexes, err := sc.listIndexes(vector, patterns...) + if err != nil { + return nil, err + } + return sc.getIdentifierEntriesOnIndex(vector, indexes...) +} + +// getStatEntriesOnIndex retrieves stats on indexes, or all stats if indexes are not defined. +func (sc *StatsClient) getStatEntriesOnIndex(vector dirVector, indexes ...uint32) (entries []adapter.StatEntry, err error) { + dirLen := *(*uint32)(vectorLen(vector)) + for _, index := range indexes { + if index >= dirLen { + return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen) + } + dirPtr, dirName, dirType := sc.GetStatDirOnIndex(vector, index) + if len(dirName) == 0 { + return + } + var t adapter.StatType + d := sc.CopyEntryData(dirPtr, ^uint32(0)) + if d != nil { + t = d.Type() + } + entries = append(entries, adapter.StatEntry{ + StatIdentifier: adapter.StatIdentifier{ + Index: index, + Name: dirName, + }, + Type: t, + Data: d, + Symlink: dirType == adapter.Symlink, + }) + } + return entries, nil +} + +// getIdentifierEntriesOnIndex retrieves identifiers on indexes, or all identifiers if indexes are not defined. +func (sc *StatsClient) getIdentifierEntriesOnIndex(vector dirVector, indexes ...uint32) (identifiers []adapter.StatIdentifier, err error) { + dirLen := *(*uint32)(vectorLen(vector)) + for _, index := range indexes { + if index >= dirLen { + return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen) + } + _, dirName, _ := sc.GetStatDirOnIndex(vector, index) + if len(dirName) == 0 { + return + } + identifiers = append(identifiers, adapter.StatIdentifier{ + Index: index, + Name: dirName, + }) + } + return identifiers, nil } // listIndexes lists indexes for all stat entries that match any of the regex patterns. -func (c *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) { +func (sc *StatsClient) listIndexes(vector dirVector, patterns ...string) (indexes []uint32, err error) { if len(patterns) == 0 { - return c.listIndexesFunc(nil) + return sc.listIndexesFunc(vector, nil) } var regexes = make([]*regexp.Regexp, len(patterns)) for i, pattern := range patterns { @@ -260,7 +556,7 @@ func (c *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err err } regexes[i] = r } - nameMatches := func(name []byte) bool { + nameMatches := func(name dirName) bool { for _, r := range regexes { if r.Match(name) { return true @@ -268,31 +564,22 @@ func (c *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err err } return false } - return c.listIndexesFunc(nameMatches) + return sc.listIndexesFunc(vector, nameMatches) } -func (c *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) { +// listIndexesFunc lists stats indexes. The optional function +// argument filters returned values or returns all if empty +func (sc *StatsClient) listIndexesFunc(vector dirVector, f func(name dirName) bool) (indexes []uint32, err error) { if f == nil { // there is around ~3157 stats, so to avoid too many allocations // we set capacity to 3200 when listing all stats indexes = make([]uint32, 0, 3200) } - - dirVector := c.getStatDirVector() - vecLen := uint32(vectorLen(dirVector)) - + vecLen := *(*uint32)(vectorLen(vector)) for i := uint32(0); i < vecLen; i++ { - dirEntry := c.getStatDirIndex(dirVector, i) - + _, dirName, _ := sc.GetStatDirOnIndex(vector, i) if f != nil { - var name []byte - for n := 0; n < len(dirEntry.name); n++ { - if dirEntry.name[n] == 0 { - name = dirEntry.name[:n] - break - } - } - if len(name) == 0 || !f(name) { + if len(dirName) == 0 || !f(dirName) { continue } } @@ -302,44 +589,25 @@ func (c *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint3 return indexes, nil } -func (c *StatsClient) dumpEntries(indexes []uint32) (entries []adapter.StatEntry, err error) { - dirVector := c.getStatDirVector() - dirLen := uint32(vectorLen(dirVector)) - - debugf("dumping entres for %d indexes", len(indexes)) - - entries = make([]adapter.StatEntry, 0, len(indexes)) - for _, index := range indexes { - if index >= dirLen { - return nil, fmt.Errorf("stat entry index %d out of dir vector length (%d)", index, dirLen) - } - - dirEntry := c.getStatDirIndex(dirVector, index) - - var name []byte - for n := 0; n < len(dirEntry.name); n++ { - if dirEntry.name[n] == 0 { - name = dirEntry.name[:n] - break - } - } - - if Debug { - debugf(" - %3d. dir: %q type: %v offset: %d union: %d", index, name, - adapter.StatType(dirEntry.directoryType), dirEntry.offsetVector, dirEntry.unionData) - } - - if len(name) == 0 { - continue - } +func (sc *StatsClient) isConnected() bool { + return atomic.LoadUint32(&sc.connected) == 1 +} - entry := adapter.StatEntry{ - Name: append([]byte(nil), name...), - Type: adapter.StatType(dirEntry.directoryType), - Data: c.copyEntryData(dirEntry), - } - entries = append(entries, entry) +// updateStatOnIndex refreshes the entry data. +func (sc *StatsClient) updateStatOnIndex(entry *adapter.StatEntry, vector dirVector) (err error) { + dirLen := *(*uint32)(vectorLen(vector)) + if entry.Index >= dirLen { + return fmt.Errorf("stat entry index %d out of dir vector length (%d)", entry.Index, dirLen) } - - return entries, nil + dirPtr, dirName, dirType := sc.GetStatDirOnIndex(vector, entry.Index) + if len(dirName) == 0 || + !bytes.Equal(dirName, entry.Name) || + dirType != entry.Type || + entry.Data == nil { + return nil + } + if err := sc.UpdateEntryData(dirPtr, &entry.Data); err != nil { + err = fmt.Errorf("updating stat data for entry %s failed: %v", dirName, err) + } + return } diff --git a/adapter/statsclient/statseg.go b/adapter/statsclient/statseg.go deleted file mode 100644 index 42ab3de..0000000 --- a/adapter/statsclient/statseg.go +++ /dev/null @@ -1,104 +0,0 @@ -package statsclient - -import ( - "sync/atomic" - "time" - "unsafe" -) - -var ( - MaxWaitInProgress = time.Millisecond * 100 - CheckDelayInProgress = time.Microsecond * 10 -) - -type sharedHeaderBase struct { - epoch int64 - inProgress int64 - directoryOffset int64 - errorOffset int64 - statsOffset int64 -} - -type sharedHeaderV0 struct { - sharedHeaderBase -} - -type sharedHeader struct { - version uint64 - sharedHeaderBase -} - -func (h *sharedHeader) legacyVersion() bool { - // older VPP (<=19.04) did not have version in stat segment header - // we try to provide fallback support by skipping it in header - if h.version > maxVersion && h.inProgress > 1 && h.epoch == 0 { - return true - } - return false -} - -func loadSharedHeader(b []byte) (header sharedHeader) { - h := (*sharedHeader)(unsafe.Pointer(&b[0])) - header.version = atomic.LoadUint64(&h.version) - header.epoch = atomic.LoadInt64(&h.epoch) - header.inProgress = atomic.LoadInt64(&h.inProgress) - header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) - header.errorOffset = atomic.LoadInt64(&h.errorOffset) - header.statsOffset = atomic.LoadInt64(&h.statsOffset) - return -} - -func loadSharedHeaderLegacy(b []byte) (header sharedHeader) { - h := (*sharedHeaderV0)(unsafe.Pointer(&b[0])) - header.version = 0 - header.epoch = atomic.LoadInt64(&h.epoch) - header.inProgress = atomic.LoadInt64(&h.inProgress) - header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) - header.errorOffset = atomic.LoadInt64(&h.errorOffset) - header.statsOffset = atomic.LoadInt64(&h.statsOffset) - return -} - -type statSegAccess struct { - epoch int64 -} - -func (c *statSegment) accessStart() statSegAccess { - t := time.Now() - - epoch, inprog := c.getEpoch() - for inprog { - if time.Since(t) > MaxWaitInProgress { - return statSegAccess{} - } else { - time.Sleep(CheckDelayInProgress) - } - epoch, inprog = c.getEpoch() - } - return statSegAccess{ - epoch: epoch, - } -} - -func (c *statSegment) accessEnd(acc *statSegAccess) bool { - epoch, inprog := c.getEpoch() - if acc.epoch != epoch || inprog { - return false - } - return true -} - -type vecHeader struct { - length uint64 - vectorData [0]uint8 -} - -func vectorLen(v unsafe.Pointer) uint64 { - vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uint64(0)))) - return vec.length -} - -//go:nosplit -func statSegPointer(p unsafe.Pointer, offset uintptr) unsafe.Pointer { - return unsafe.Pointer(uintptr(p) + offset) -} diff --git a/adapter/statsclient/statseg_v1.go b/adapter/statsclient/statseg_v1.go new file mode 100644 index 0000000..9e0e469 --- /dev/null +++ b/adapter/statsclient/statseg_v1.go @@ -0,0 +1,365 @@ +// Copyright (c) 2019 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsclient + +import ( + "fmt" + "sync/atomic" + "unsafe" + + "go.fd.io/govpp/adapter" +) + +type statSegmentV1 struct { + sharedHeader []byte + memorySize int64 +} + +type sharedHeaderV1 struct { + version uint64 + epoch int64 + inProgress int64 + directoryOffset int64 + errorOffset int64 + statsOffset int64 +} + +type statSegDirectoryEntryV1 struct { + directoryType dirType + // unionData can represent: + // - offset + // - index + // - value + unionData uint64 + offsetVector uint64 + name [128]byte +} + +func newStatSegmentV1(data []byte, size int64) *statSegmentV1 { + return &statSegmentV1{ + sharedHeader: data, + memorySize: size, + } +} + +func (ss *statSegmentV1) loadSharedHeader(b []byte) (header sharedHeaderV1) { + h := (*sharedHeaderV1)(unsafe.Pointer(&b[0])) + return sharedHeaderV1{ + version: atomic.LoadUint64(&h.version), + epoch: atomic.LoadInt64(&h.epoch), + inProgress: atomic.LoadInt64(&h.inProgress), + directoryOffset: atomic.LoadInt64(&h.directoryOffset), + errorOffset: atomic.LoadInt64(&h.errorOffset), + statsOffset: atomic.LoadInt64(&h.statsOffset), + } +} + +func (ss *statSegmentV1) GetDirectoryVector() dirVector { + dirOffset, _, _ := ss.getOffsets() + return dirVector(&ss.sharedHeader[dirOffset]) +} + +func (ss *statSegmentV1) getErrorVector() (unsafe.Pointer, error) { + return nil, fmt.Errorf("error vector is not defined for stats API v1") +} + +func (ss *statSegmentV1) GetStatDirOnIndex(v dirVector, index uint32) (dirSegment, dirName, adapter.StatType) { + statSegDir := dirSegment(uintptr(v) + uintptr(index)*unsafe.Sizeof(statSegDirectoryEntryV1{})) + dir := (*statSegDirectoryEntryV1)(statSegDir) + var name []byte + for n := 0; n < len(dir.name); n++ { + if dir.name[n] == 0 { + name = dir.name[:n] + break + } + } + return statSegDir, name, getStatType(dir.directoryType, true) +} + +func (ss *statSegmentV1) GetEpoch() (int64, bool) { + sh := ss.loadSharedHeader(ss.sharedHeader) + return sh.epoch, sh.inProgress != 0 +} + +func (ss *statSegmentV1) CopyEntryData(segment dirSegment, _ uint32) adapter.Stat { + dirEntry := (*statSegDirectoryEntryV1)(segment) + typ := getStatType(dirEntry.directoryType, true) + + switch typ { + case adapter.ScalarIndex: + return adapter.ScalarStat(dirEntry.unionData) + + case adapter.ErrorIndex: + if dirEntry.unionData >= uint64(len(ss.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } + + _, errOffset, _ := ss.getOffsets() + offsetVector := dirVector(&ss.sharedHeader[errOffset]) + + var errData []adapter.Counter + + vecLen := *(*uint32)(vectorLen(offsetVector)) + for i := uint32(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0)) + debugf("error index, cb: %d, offset: %d", cb, offset) + val := *(*adapter.Counter)(statSegPointer(dirVector(&ss.sharedHeader[0]), offset)) + errData = append(errData, val) + } + return adapter.ErrorStat(errData) + + case adapter.SimpleCounterVector: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } + + vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData]))) + offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + + data := make([][]adapter.Counter, vecLen) + for i := uint32(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + counterVec := dirVector(&ss.sharedHeader[uintptr(cb)]) + vecLen2 := *(*uint32)(vectorLen(counterVec)) + data[i] = make([]adapter.Counter, vecLen2) + for j := uint32(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(counterVec, offset)) + data[i][j] = val + } + } + return adapter.SimpleCounterStat(data) + + case adapter.CombinedCounterVector: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } + + vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData]))) + offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + + data := make([][]adapter.CombinedCounter, vecLen) + for i := uint32(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + counterVec := dirVector(&ss.sharedHeader[uintptr(cb)]) + vecLen2 := *(*uint32)(vectorLen(counterVec)) + data[i] = make([]adapter.CombinedCounter, vecLen2) + for j := uint32(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) + val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset)) + data[i][j] = val + } + } + return adapter.CombinedCounterStat(data) + + case adapter.NameVector: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } + + vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData]))) + offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + + data := make([]adapter.Name, vecLen) + for i := uint32(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + if cb == 0 { + debugf("name vector out of range for %s (%v)", dirEntry.name, i) + continue + } + nameVec := dirVector(&ss.sharedHeader[cb]) + vecLen2 := *(*uint32)(vectorLen(nameVec)) + + nameStr := make([]byte, 0, vecLen2) + for j := uint32(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(byte(0)) + val := *(*byte)(statSegPointer(nameVec, offset)) + if val > 0 { + nameStr = append(nameStr, val) + } + } + data[i] = nameStr + } + return adapter.NameStat(data) + + case adapter.Empty: + // no-op + + case adapter.Symlink: + debugf("Symlinks are not supported for stats v1") + + default: + // TODO: monitor occurrences with metrics + debugf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name) + } + return nil +} + +func (ss *statSegmentV1) UpdateEntryData(segment dirSegment, stat *adapter.Stat) error { + dirEntry := (*statSegDirectoryEntryV1)(segment) + switch (*stat).(type) { + case adapter.ScalarStat: + *stat = adapter.ScalarStat(dirEntry.unionData) + + case adapter.ErrorStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } + + _, errOffset, _ := ss.getOffsets() + offsetVector := dirVector(&ss.sharedHeader[errOffset]) + + var errData []adapter.Counter + + vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[errOffset]))) + for i := uint32(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(dirVector(&ss.sharedHeader[0]), offset)) + errData = append(errData, val) + } + *stat = adapter.ErrorStat(errData) + + case adapter.SimpleCounterStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } + + vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData]))) + offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + + data := (*stat).(adapter.SimpleCounterStat) + if uint32(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint32(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + counterVec := dirVector(&ss.sharedHeader[uintptr(cb)]) + vecLen2 := *(*uint32)(vectorLen(counterVec)) + simpleData := data[i] + if uint32(len(simpleData)) != vecLen2 { + return ErrStatDataLenIncorrect + } + for j := uint32(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(counterVec, offset)) + simpleData[j] = val + } + } + + case adapter.CombinedCounterStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } + + vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData]))) + offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + + data := (*stat).(adapter.CombinedCounterStat) + if uint32(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint32(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + counterVec := dirVector(&ss.sharedHeader[uintptr(cb)]) + vecLen2 := *(*uint32)(vectorLen(counterVec)) + combData := data[i] + if uint32(len(combData)) != vecLen2 { + return ErrStatDataLenIncorrect + } + for j := uint32(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) + val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset)) + combData[j] = val + } + } + + case adapter.NameStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(ss.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } + + vecLen := *(*uint32)(vectorLen(dirVector(&ss.sharedHeader[dirEntry.unionData]))) + offsetVector := statSegPointer(dirVector(&ss.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + + data := (*stat).(adapter.NameStat) + if uint32(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint32(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + if cb == 0 { + continue + } + nameVec := dirVector(&ss.sharedHeader[cb]) + vecLen2 := *(*uint32)(vectorLen(nameVec)) + + nameData := data[i] + if uint32(len(nameData))+1 != vecLen2 { + return ErrStatDataLenIncorrect + } + for j := uint32(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(byte(0)) + val := *(*byte)(statSegPointer(nameVec, offset)) + if val == 0 { + break + } + nameData[j] = val + } + } + + default: + if Debug { + Log.Debugf("Unrecognized stat type %T for stat entry: %v", stat, dirEntry.name) + } + } + return nil +} + +// Get offsets for various types of data +func (ss *statSegmentV1) getOffsets() (dir, err, stat int64) { + sh := ss.loadSharedHeader(ss.sharedHeader) + return sh.directoryOffset, sh.errorOffset, sh.statsOffset +} diff --git a/adapter/statsclient/statseg_v2.go b/adapter/statsclient/statseg_v2.go new file mode 100644 index 0000000..f808112 --- /dev/null +++ b/adapter/statsclient/statseg_v2.go @@ -0,0 +1,411 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsclient + +import ( + "bytes" + "encoding/binary" + "sync/atomic" + "unsafe" + + "go.fd.io/govpp/adapter" +) + +type statSegmentV2 struct { + sharedHeader []byte + memorySize int64 +} + +type sharedHeaderV2 struct { + version uint64 + base unsafe.Pointer + epoch int64 + inProgress int64 + dirVector unsafe.Pointer + errorVector unsafe.Pointer +} + +type statSegDirectoryEntryV2 struct { + directoryType dirType + // unionData can represent: + // - symlink indexes + // - index + // - value + // - pointer to data + unionData uint64 + name [128]byte +} + +func newStatSegmentV2(data []byte, size int64) *statSegmentV2 { + return &statSegmentV2{ + sharedHeader: data, + memorySize: size, + } +} + +func (ss *statSegmentV2) loadSharedHeader(b []byte) (header sharedHeaderV2) { + h := (*sharedHeaderV2)(unsafe.Pointer(&b[0])) + return sharedHeaderV2{ + version: atomic.LoadUint64(&h.version), + base: atomic.LoadPointer(&h.base), + epoch: atomic.LoadInt64(&h.epoch), + inProgress: atomic.LoadInt64(&h.inProgress), + dirVector: atomic.LoadPointer(&h.dirVector), + errorVector: atomic.LoadPointer(&h.errorVector), + } +} + +func (ss *statSegmentV2) GetDirectoryVector() dirVector { + header := ss.loadSharedHeader(ss.sharedHeader) + return ss.adjust(dirVector(&header.dirVector)) +} + +func (ss *statSegmentV2) GetStatDirOnIndex(v dirVector, index uint32) (dirSegment, dirName, adapter.StatType) { + statSegDir := dirSegment(uintptr(v) + uintptr(index)*unsafe.Sizeof(statSegDirectoryEntryV2{})) + dir := (*statSegDirectoryEntryV2)(statSegDir) + var name []byte + for n := 0; n < len(dir.name); n++ { + if dir.name[n] == 0 { + name = dir.name[:n] + break + } + } + return statSegDir, name, getStatType(dir.directoryType, ss.getErrorVector() != nil) +} + +func (ss *statSegmentV2) GetEpoch() (int64, bool) { + sh := ss.loadSharedHeader(ss.sharedHeader) + return sh.epoch, sh.inProgress != 0 +} + +func (ss *statSegmentV2) CopyEntryData(segment dirSegment, index uint32) adapter.Stat { + dirEntry := (*statSegDirectoryEntryV2)(segment) + typ := getStatType(dirEntry.directoryType, ss.getErrorVector() != nil) + // skip zero pointer value + if typ != adapter.ScalarIndex && typ != adapter.Empty && typ != adapter.ErrorIndex && dirEntry.unionData == 0 { + debugf("data pointer not defined for %s", dirEntry.name) + return nil + } + + switch typ { + case adapter.ScalarIndex: + return adapter.ScalarStat(dirEntry.unionData) + + case adapter.ErrorIndex: + dirVector := ss.getErrorVector() + if dirVector == nil { + debugf("error vector pointer is out of range for %s", dirEntry.name) + return nil + } + vecLen := *(*uint32)(vectorLen(dirVector)) + var errData []adapter.Counter + for i := uint32(0); i < vecLen; i++ { + cb := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0))) + cbVal := ss.adjust(vectorLen(cb)) + if cbVal == nil { + debugf("error counter pointer out of range") + continue + } + offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(cbVal, offset)) + errData = append(errData, val) + } + return adapter.ErrorStat(errData) + + case adapter.SimpleCounterVector: + dirVector := ss.adjust(dirVector(&dirEntry.unionData)) + if dirVector == nil { + debugf("data vector pointer is out of range for %s", dirEntry.name) + return nil + } + vecLen := *(*uint32)(vectorLen(dirVector)) + data := make([][]adapter.Counter, vecLen) + for i := uint32(0); i < vecLen; i++ { + counterVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0))) + counterVector := ss.adjust(vectorLen(counterVectorOffset)) + if counterVector == nil { + debugf("counter (vector simple) pointer out of range") + continue + } + counterVectorLength := *(*uint32)(vectorLen(counterVector)) + if index == ^uint32(0) { + data[i] = make([]adapter.Counter, counterVectorLength) + for j := uint32(0); j < counterVectorLength; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) + data[i][j] = *(*adapter.Counter)(statSegPointer(counterVector, offset)) + } + } else { + data[i] = make([]adapter.Counter, 1) // expect single value + for j := uint32(0); j < counterVectorLength; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) + if index == j { + data[i][0] = *(*adapter.Counter)(statSegPointer(counterVector, offset)) + break + } + } + } + } + return adapter.SimpleCounterStat(data) + + case adapter.CombinedCounterVector: + dirVector := ss.adjust(dirVector(&dirEntry.unionData)) + if dirVector == nil { + debugf("data vector pointer is out of range for %s", dirEntry.name) + return nil + } + vecLen := *(*uint32)(vectorLen(dirVector)) + data := make([][]adapter.CombinedCounter, vecLen) + for i := uint32(0); i < vecLen; i++ { + counterVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0))) + counterVector := ss.adjust(vectorLen(counterVectorOffset)) + if counterVector == nil { + debugf("counter (vector combined) pointer out of range") + continue + } + counterVectorLength := *(*uint32)(vectorLen(counterVector)) + if index == ^uint32(0) { + data[i] = make([]adapter.CombinedCounter, counterVectorLength) + for j := uint32(0); j < counterVectorLength; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) + data[i][j] = *(*adapter.CombinedCounter)(statSegPointer(counterVector, offset)) + } + } else { + data[i] = make([]adapter.CombinedCounter, 1) // expect single value pair + for j := uint32(0); j < counterVectorLength; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) + if index == j { + data[i][0] = *(*adapter.CombinedCounter)(statSegPointer(counterVector, offset)) + break + } + } + } + } + return adapter.CombinedCounterStat(data) + + case adapter.NameVector: + dirVector := ss.adjust(dirVector(&dirEntry.unionData)) + if dirVector == nil { + debugf("data vector pointer is out of range for %s", dirEntry.name) + return nil + } + vecLen := *(*uint32)(vectorLen(dirVector)) + data := make([]adapter.Name, vecLen) + for i := uint32(0); i < vecLen; i++ { + nameVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0))) + if uintptr(nameVectorOffset) == 0 { + debugf("name vector out of range for %s (%v)", dirEntry.name, i) + continue + } + nameVector := ss.adjust(vectorLen(nameVectorOffset)) + if nameVector == nil { + debugf("name data pointer out of range") + continue + } + nameVectorLen := *(*uint32)(vectorLen(nameVector)) + name := make([]byte, 0, nameVectorLen) + for j := uint32(0); j < nameVectorLen; j++ { + offset := uintptr(j) * unsafe.Sizeof(byte(0)) + value := *(*byte)(statSegPointer(nameVector, offset)) + if value > 0 { + name = append(name, value) + } + } + data[i] = name + } + return adapter.NameStat(data) + + case adapter.Empty: + return adapter.EmptyStat("<none>") + // no-op + + case adapter.Symlink: + // prevent recursion loops + if index != ^uint32(0) { + debugf("received symlink with defined item index") + return nil + } + i1, i2 := ss.getSymlinkIndexes(dirEntry) + // use first index to get the stats directory the symlink points to + header := ss.loadSharedHeader(ss.sharedHeader) + dirVector := ss.adjust(dirVector(&header.dirVector)) + statSegDir2 := dirSegment(uintptr(dirVector) + uintptr(i1)*unsafe.Sizeof(statSegDirectoryEntryV2{})) + + // retry with actual stats segment and use second index to get + // stats for the required item + return ss.CopyEntryData(statSegDir2, i2) + + default: + // TODO: monitor occurrences with metrics + debugf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name) + } + return nil +} + +func (ss *statSegmentV2) UpdateEntryData(segment dirSegment, stat *adapter.Stat) error { + dirEntry := (*statSegDirectoryEntryV2)(segment) + switch (*stat).(type) { + case adapter.ScalarStat: + *stat = adapter.ScalarStat(dirEntry.unionData) + + case adapter.ErrorStat: + dirVector := ss.getErrorVector() + if dirVector == nil { + debugf("error vector pointer is out of range for %s", dirEntry.name) + return nil + } + vecLen := *(*uint32)(vectorLen(dirVector)) + var errData []adapter.Counter + for i := uint32(0); i < vecLen; i++ { + cb := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0))) + cbVal := ss.adjust(vectorLen(cb)) + if cbVal == nil { + debugf("error counter pointer out of range") + continue + } + offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(cbVal, offset)) + errData = append(errData, val) + } + *stat = adapter.ErrorStat(errData) + + case adapter.SimpleCounterStat: + dirVector := ss.adjust(dirVector(&dirEntry.unionData)) + if dirVector == nil { + debugf("data vector pointer is out of range for %s", dirEntry.name) + return nil + } + vecLen := *(*uint32)(vectorLen(dirVector)) + data := (*stat).(adapter.SimpleCounterStat) + if uint32(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint32(0); i < vecLen; i++ { + counterVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0))) + counterVector := ss.adjust(vectorLen(counterVectorOffset)) + if counterVector == nil { + debugf("counter (vector simple) pointer out of range") + continue + } + counterVectorLength := *(*uint32)(vectorLen(counterVector)) + data[i] = make([]adapter.Counter, counterVectorLength) + for j := uint32(0); j < counterVectorLength; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(counterVector, offset)) + data[i][j] = val + } + } + + case adapter.CombinedCounterStat: + dirVector := ss.adjust(dirVector(&dirEntry.unionData)) + if dirVector == nil { + debugf("data vector pointer is out of range for %s", dirEntry.name) + return nil + } + vecLen := *(*uint32)(vectorLen(dirVector)) + data := (*stat).(adapter.CombinedCounterStat) + for i := uint32(0); i < vecLen; i++ { + counterVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0))) + counterVector := ss.adjust(vectorLen(counterVectorOffset)) + if counterVector == nil { + debugf("counter (vector combined) pointer out of range") + continue + } + counterVectorLength := *(*uint32)(vectorLen(counterVector)) + data[i] = make([]adapter.CombinedCounter, counterVectorLength) + for j := uint32(0); j < counterVectorLength; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) + val := *(*adapter.CombinedCounter)(statSegPointer(counterVector, offset)) + data[i][j] = val + } + } + + case adapter.NameStat: + dirVector := ss.adjust(dirVector(&dirEntry.unionData)) + if dirVector == nil { + debugf("data vector pointer is out of range for %s", dirEntry.name) + return nil + } + vecLen := *(*uint32)(vectorLen(dirVector)) + data := (*stat).(adapter.NameStat) + for i := uint32(0); i < vecLen; i++ { + nameVectorOffset := statSegPointer(dirVector, uintptr(i+1)*unsafe.Sizeof(uint64(0))) + if uintptr(nameVectorOffset) == 0 { + debugf("name vector out of range for %s (%v)", dirEntry.name, i) + continue + } + nameVector := ss.adjust(vectorLen(nameVectorOffset)) + if nameVector == nil { + debugf("name data pointer out of range") + continue + } + nameVectorLen := *(*uint32)(vectorLen(nameVector)) + nameData := data[i] + if uint32(len(nameData))+1 != nameVectorLen { + return ErrStatDataLenIncorrect + } + for j := uint32(0); j < nameVectorLen; j++ { + offset := uintptr(j) * unsafe.Sizeof(byte(0)) + value := *(*byte)(statSegPointer(nameVector, offset)) + if value == 0 { + break + } + nameData[j] = value + } + } + + default: + if Debug { + Log.Debugf("Unrecognized stat type %T for stat entry: %v", stat, dirEntry.name) + } + } + return nil +} + +// Adjust data pointer using shared header and base and return +// the pointer to a data segment +func (ss *statSegmentV2) adjust(data dirVector) dirVector { + header := ss.loadSharedHeader(ss.sharedHeader) + adjusted := dirVector(uintptr(unsafe.Pointer(&ss.sharedHeader[0])) + + uintptr(*(*uint64)(data)) - uintptr(*(*uint64)(unsafe.Pointer(&header.base)))) + if uintptr(unsafe.Pointer(&ss.sharedHeader[len(ss.sharedHeader)-1])) <= uintptr(adjusted) || + uintptr(unsafe.Pointer(&ss.sharedHeader[0])) >= uintptr(adjusted) { + return nil + } + return adjusted +} + +func (ss *statSegmentV2) getErrorVector() dirVector { + header := ss.loadSharedHeader(ss.sharedHeader) + return ss.adjust(dirVector(&header.errorVector)) +} + +func (ss *statSegmentV2) getSymlinkIndexes(dirEntry *statSegDirectoryEntryV2) (index1, index2 uint32) { + var b bytes.Buffer + if err := binary.Write(&b, binary.LittleEndian, dirEntry.unionData); err != nil { + debugf("error getting symlink indexes for %s: %v", dirEntry.name, err) + return + } + if len(b.Bytes()) != 8 { + debugf("incorrect symlink union data length for %s: expected 8, got %d", dirEntry.name, len(b.Bytes())) + return + } + for i := range b.Bytes()[:4] { + index1 += uint32(b.Bytes()[i]) << (uint32(i) * 8) + } + for i := range b.Bytes()[4:] { + index2 += uint32(b.Bytes()[i+4]) << (uint32(i) * 8) + } + return +} diff --git a/adapter/vppapiclient/stat_client.go b/adapter/vppapiclient/stat_client.go index bf19c45..4144c6c 100644 --- a/adapter/vppapiclient/stat_client.go +++ b/adapter/vppapiclient/stat_client.go @@ -29,7 +29,7 @@ import ( "os" "unsafe" - "git.fd.io/govpp.git/adapter" + "go.fd.io/govpp/adapter" ) // global VPP stats API client, library vppapiclient only supports @@ -83,7 +83,7 @@ func (c *statClient) Disconnect() error { return nil } -func (c *statClient) ListStats(patterns ...string) (stats []string, err error) { +func (c *statClient) ListStats(patterns ...string) (indexes []adapter.StatIdentifier, err error) { dir := C.govpp_stat_segment_ls(convertStringSlice(patterns)) if dir == nil { return nil, adapter.ErrStatsDataBusy @@ -93,11 +93,14 @@ func (c *statClient) ListStats(patterns ...string) (stats []string, err error) { l := C.govpp_stat_segment_vec_len(unsafe.Pointer(dir)) for i := 0; i < int(l); i++ { nameChar := C.govpp_stat_segment_dir_index_to_name(dir, C.uint32_t(i)) - stats = append(stats, C.GoString(nameChar)) + indexes = append(indexes, adapter.StatIdentifier{ + Name: []byte(C.GoString(nameChar)), + Index: uint32(i), + }) C.free(unsafe.Pointer(nameChar)) } - return stats, nil + return indexes, nil } func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, err error) { @@ -121,7 +124,10 @@ func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, e typ := adapter.StatType(C.govpp_stat_segment_data_type(&v)) stat := adapter.StatEntry{ - Name: []byte(name), + StatIdentifier: adapter.StatIdentifier{ + Name: []byte(name), + Index: uint32(i), + }, Type: typ, } @@ -130,7 +136,7 @@ func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, e stat.Data = adapter.ScalarStat(C.govpp_stat_segment_data_get_scalar_value(&v)) case adapter.ErrorIndex: - stat.Data = adapter.ErrorStat(C.govpp_stat_segment_data_get_error_value(&v)) + stat.Data = adapter.ErrorStat([]adapter.Counter{adapter.Counter(C.govpp_stat_segment_data_get_error_value(&v))}) case adapter.SimpleCounterVector: length := int(C.govpp_stat_segment_vec_len(unsafe.Pointer(C.govpp_stat_segment_data_get_simple_counter(&v)))) @@ -147,10 +153,10 @@ func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, e vector := make([][]adapter.CombinedCounter, length) for k := 0; k < length; k++ { for j := 0; j < int(C.govpp_stat_segment_vec_len(unsafe.Pointer(C.govpp_stat_segment_data_get_combined_counter_index(&v, C.int(k))))); j++ { - vector[k] = append(vector[k], adapter.CombinedCounter([2]uint64{ + vector[k] = append(vector[k], [2]uint64{ uint64(C.govpp_stat_segment_data_get_combined_counter_index_packets(&v, C.int(k), C.int(j))), uint64(C.govpp_stat_segment_data_get_combined_counter_index_bytes(&v, C.int(k), C.int(j))), - })) + }) } } stat.Data = adapter.CombinedCounterStat(vector) @@ -180,11 +186,15 @@ func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, e return stats, nil } -func (c *statClient) PrepareDir(prefixes ...string) (*adapter.StatDir, error) { +func (c *statClient) PrepareDir(_ ...string) (*adapter.StatDir, error) { + return nil, adapter.ErrNotImplemented +} + +func (c *statClient) PrepareDirOnIndex(_ ...uint32) (*adapter.StatDir, error) { return nil, adapter.ErrNotImplemented } -func (c *statClient) UpdateDir(dir *adapter.StatDir) error { +func (c *statClient) UpdateDir(_ *adapter.StatDir) error { return adapter.ErrNotImplemented } diff --git a/adapter/vppapiclient/stat_client_stub.go b/adapter/vppapiclient/stat_client_stub.go index c764391..3aa9cc5 100644 --- a/adapter/vppapiclient/stat_client_stub.go +++ b/adapter/vppapiclient/stat_client_stub.go @@ -17,7 +17,7 @@ package vppapiclient import ( - "git.fd.io/govpp.git/adapter" + "go.fd.io/govpp/adapter" ) // stubStatClient is just an stub adapter that does nothing. It builds only on Windows and OSX, where the real @@ -36,7 +36,7 @@ func (*stubStatClient) Disconnect() error { return nil } -func (*stubStatClient) ListStats(patterns ...string) (statNames []string, err error) { +func (*stubStatClient) ListStats(patterns ...string) (indexes []adapter.StatIdentifier, err error) { return nil, adapter.ErrNotImplemented } @@ -48,6 +48,10 @@ func (*stubStatClient) PrepareDir(prefixes ...string) (*adapter.StatDir, error) return nil, adapter.ErrNotImplemented } +func (*stubStatClient) PrepareDirOnIndex(indexes ...uint32) (*adapter.StatDir, error) { + return nil, adapter.ErrNotImplemented +} + func (*stubStatClient) UpdateDir(dir *adapter.StatDir) error { return adapter.ErrNotImplemented } diff --git a/adapter/vppapiclient/vppapiclient.go b/adapter/vppapiclient/vppapiclient.go index 977f32d..3ab460d 100644 --- a/adapter/vppapiclient/vppapiclient.go +++ b/adapter/vppapiclient/vppapiclient.go @@ -29,12 +29,13 @@ import ( "os" "path/filepath" "reflect" + "sync/atomic" "time" "unsafe" "github.com/fsnotify/fsnotify" - "git.fd.io/govpp.git/adapter" + "go.fd.io/govpp/adapter" ) var ( @@ -52,7 +53,7 @@ const ( // global VPP binary API client, library vppapiclient only supports // single connection at a time -var globalVppClient *vppClient +var globalVppClient unsafe.Pointer // stubVppClient is the default implementation of the VppAPI. type vppClient struct { @@ -76,7 +77,8 @@ func NewVppClientWithInputQueueSize(shmPrefix string, inputQueueSize uint16) ada // Connect connects the process to VPP. func (a *vppClient) Connect() error { - if globalVppClient != nil { + h := (*vppClient)(atomic.LoadPointer(&globalVppClient)) + if h != nil { return fmt.Errorf("already connected to binary API, disconnect first") } @@ -92,19 +94,17 @@ func (a *vppClient) Connect() error { return fmt.Errorf("connecting to VPP binary API failed (rc=%v)", rc) } - globalVppClient = a + atomic.StorePointer(&globalVppClient, unsafe.Pointer(a)) return nil } // Disconnect disconnects the process from VPP. func (a *vppClient) Disconnect() error { - globalVppClient = nil - + atomic.StorePointer(&globalVppClient, nil) rc := C.govpp_disconnect() if rc != 0 { return fmt.Errorf("disconnecting from VPP binary API failed (rc=%v)", rc) } - return nil } @@ -187,9 +187,12 @@ func (a *vppClient) WaitReady() error { //export go_msg_callback func go_msg_callback(msgID C.uint16_t, data unsafe.Pointer, size C.size_t) { + h := (*vppClient)(atomic.LoadPointer(&globalVppClient)) + if h == nil { + return + } // convert unsafe.Pointer to byte slice sliceHeader := &reflect.SliceHeader{Data: uintptr(data), Len: int(size), Cap: int(size)} byteSlice := *(*[]byte)(unsafe.Pointer(sliceHeader)) - - globalVppClient.msgCallback(uint16(msgID), byteSlice) + h.msgCallback(uint16(msgID), byteSlice) } diff --git a/adapter/vppapiclient/vppapiclient_stub.go b/adapter/vppapiclient/vppapiclient_stub.go index 20ad12b..57b71c3 100644 --- a/adapter/vppapiclient/vppapiclient_stub.go +++ b/adapter/vppapiclient/vppapiclient_stub.go @@ -17,7 +17,7 @@ package vppapiclient import ( - "git.fd.io/govpp.git/adapter" + "go.fd.io/govpp/adapter" ) // stubVppClient is just an stub adapter that does nothing. It builds only on Windows and OSX, where the real |