diff options
author | Ondrej Fabry <ofabry@cisco.com> | 2019-09-17 12:41:47 +0200 |
---|---|---|
committer | Ondrej Fabry <ofabry@cisco.com> | 2019-10-03 12:54:22 +0200 |
commit | 809b69ea4a90455445c34bbad7d8e5fea5cf3462 (patch) | |
tree | ba4b94339ac83908e2d9933692dc373929380aa7 | |
parent | 48198748bdfcc7d30c794cdac19de822da53f840 (diff) |
Optimizations for statsclient
- this dramatically improves performance for stats data collection
- memory allocation is now done only when stat dirs change
- updating prepared stat dir does not need to allocate memory
- created integration test for testing stats client
- added NumWorkerThreads and VectorRatePerWorker to SystemStats
- added ReduceSimpleCounterStatIndex, ReduceCombinedCounterStatIndex for
aggregating specific index
Change-Id: I702731a69024ab5dd0832bb5cfe2773a987359e5
Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | Makefile | 16 | ||||
-rw-r--r-- | adapter/mock/mock_stats_adapter.go | 30 | ||||
-rw-r--r-- | adapter/stats_api.go | 129 | ||||
-rw-r--r-- | adapter/statsclient/stat_segment.go | 415 | ||||
-rw-r--r-- | adapter/statsclient/statsclient.go | 273 | ||||
-rw-r--r-- | adapter/statsclient/statseg.go | 100 | ||||
-rw-r--r-- | adapter/statsclient/version.go | 33 | ||||
-rw-r--r-- | adapter/vppapiclient/stat_client.go | 28 | ||||
-rw-r--r-- | adapter/vppapiclient/stat_client_stub.go | 10 | ||||
-rw-r--r-- | api/stats.go | 66 | ||||
-rw-r--r-- | core/connection.go | 6 | ||||
-rw-r--r-- | core/stats.go | 493 | ||||
-rw-r--r-- | examples/perf-bench/perf-bench.go | 16 | ||||
-rw-r--r-- | examples/stats-client/README.md (renamed from examples/stats-api/README.md) | 2 | ||||
-rw-r--r-- | examples/stats-client/stats_api.go (renamed from examples/stats-api/stats_api.go) | 169 | ||||
-rw-r--r-- | test/integration/stats_integration_test.go | 175 |
17 files changed, 1286 insertions, 680 deletions
@@ -1,6 +1,7 @@ *~ *.log *.out +*.test .idea/ @@ -9,7 +10,7 @@ cmd/binapi-generator/binapi-generator # examples examples/perf-bench/perf-bench +examples/rpc-service/rpc-service examples/simple-client/simple-client -examples/stats-api/stats-api +examples/stats-client/stats-client examples/union-example/union-example -examples/rpc-service/rpc-service @@ -43,11 +43,11 @@ build: examples: @echo "=> building examples" - cd examples/simple-client && $(GO) build ${GO_BUILD_ARGS} -v - cd examples/stats-api && $(GO) build ${GO_BUILD_ARGS} -v cd examples/perf-bench && $(GO) build ${GO_BUILD_ARGS} -v - cd examples/union-example && $(GO) build ${GO_BUILD_ARGS} -v cd examples/rpc-service && $(GO) build ${GO_BUILD_ARGS} -v + cd examples/simple-client && $(GO) build ${GO_BUILD_ARGS} -v + cd examples/stats-client && $(GO) build ${GO_BUILD_ARGS} -v + cd examples/union-example && $(GO) build ${GO_BUILD_ARGS} -v clean: @echo "=> cleaning" @@ -56,8 +56,12 @@ clean: test: @echo "=> running tests" - $(GO) test -v ./cmd/... - $(GO) test -v ./ ./api ./adapter ./codec ./core + $(GO) test ${GO_BUILD_ARGS} ./cmd/... + $(GO) test ${GO_BUILD_ARGS} ./ ./api ./adapter ./codec ./core + +test-integration: + @echo "=> running integration tests" + $(GO) test ${GO_BUILD_ARGS} ./test/integration lint: @echo "=> running linter" @@ -87,6 +91,6 @@ extras: .PHONY: all \ - install build examples clean test lint \ + install build examples clean test test-integration lint \ generate generate-binapi gen-binapi-docker \ extras diff --git a/adapter/mock/mock_stats_adapter.go b/adapter/mock/mock_stats_adapter.go index aba93a2..55b1831 100644 --- a/adapter/mock/mock_stats_adapter.go +++ b/adapter/mock/mock_stats_adapter.go @@ -21,9 +21,13 @@ import ( "git.fd.io/govpp.git/adapter" ) +// implements StatsAPI +var _ adapter.StatsAPI = (*StatsAdapter)(nil) + // StatsAdapter simulates VPP stats socket from which stats can be read type StatsAdapter struct { - entries []*adapter.StatEntry + entries []adapter.StatEntry + dir *adapter.StatDir } // NewStatsAdapter returns a new mock stats adapter. @@ -45,17 +49,31 @@ func (a *StatsAdapter) Disconnect() error { func (a *StatsAdapter) ListStats(patterns ...string) ([]string, error) { var statNames []string for _, stat := range a.entries { - statNames = append(statNames, stat.Name) + statNames = append(statNames, string(stat.Name)) } return statNames, nil } // DumpStats mocks all stat entries dump. -func (a *StatsAdapter) DumpStats(patterns ...string) ([]*adapter.StatEntry, error) { +func (a *StatsAdapter) DumpStats(patterns ...string) ([]adapter.StatEntry, error) { return a.entries, nil } -// MockStats replaces current values of all supported stats by provided value -func (a *StatsAdapter) MockStats(stats []*adapter.StatEntry) { +func (a *StatsAdapter) PrepareDir(prefixes ...string) (*adapter.StatDir, error) { + return a.dir, nil +} + +func (a *StatsAdapter) UpdateDir(dir *adapter.StatDir) error { + *dir = *a.dir + return nil +} + +// MockStats sets mocked stat entries to be returned by DumpStats. +func (a *StatsAdapter) MockStats(stats []adapter.StatEntry) { a.entries = stats -}
\ No newline at end of file +} + +// MockStats sets mocked stat dir to be returned by PrepareDir. +func (a *StatsAdapter) MockDir(dir *adapter.StatDir) { + a.dir = dir +} diff --git a/adapter/stats_api.go b/adapter/stats_api.go index 90ecd78..d67434c 100644 --- a/adapter/stats_api.go +++ b/adapter/stats_api.go @@ -25,23 +25,27 @@ const ( ) var ( - ErrStatDirBusy = errors.New("stat dir busy") - ErrStatDumpBusy = errors.New("stat dump busy") + ErrStatsDataBusy = errors.New("stats data busy") + ErrStatsDirStale = errors.New("stats dir stale") + ErrStatsAccessFailed = errors.New("stats access failed") ) // StatsAPI provides connection to VPP stats API. type StatsAPI interface { // Connect establishes client connection to the stats API. Connect() error - // Disconnect terminates client connection. Disconnect() error - // ListStats lists names for all stats. - ListStats(patterns ...string) (statNames []string, err error) - + // ListStats lists names for stats matching patterns. + ListStats(patterns ...string) (names []string, err error) // DumpStats dumps all stat entries. - DumpStats(patterns ...string) ([]*StatEntry, error) + DumpStats(patterns ...string) (entries []StatEntry, err error) + + // PrepareDir prepares new stat dir for entries that match any of prefixes. + PrepareDir(patterns ...string) (*StatDir, error) + // UpdateDir updates stat dir and all of their entries. + UpdateDir(dir *StatDir) error } // StatType represents type of stat directory and simply @@ -73,25 +77,50 @@ func (d StatType) String() string { return fmt.Sprintf("UnknownStatType(%d)", d) } +// StatDir defines directory of stats entries created by PrepareDir. +type StatDir struct { + Epoch int64 + Indexes []uint32 + Entries []StatEntry +} + // StatEntry represents single stat entry. The type of stat stored in Data // is defined by Type. type StatEntry struct { - Name string + Name []byte Type StatType Data Stat } -// Counter represents simple counter with single value. +// Counter represents simple counter with single value, which is usually packet count. type Counter uint64 // CombinedCounter represents counter with two values, for packet count and bytes count. -type CombinedCounter struct { - Packets Counter - Bytes Counter +type CombinedCounter [2]uint64 + +func (s CombinedCounter) Packets() uint64 { + return uint64(s[0]) +} + +func (s CombinedCounter) Bytes() uint64 { + return uint64(s[1]) } // Name represents string value stored under name vector. -type Name string +type Name []byte + +func (n Name) String() string { + return string(n) +} + +// Data represents some type of stat which is usually defined by StatType. +type Stat interface { + // IsZero returns true if all of its values equal to zero. + IsZero() bool + + // isStat is intentionally unexported to limit implementations of interface to this package, + isStat() +} // ScalarStat represents stat for ScalarIndex. type ScalarStat float64 @@ -102,24 +131,86 @@ type ErrorStat Counter // SimpleCounterStat represents stat for SimpleCounterVector. // The outer array represents workers and the inner array represents interface/node/.. indexes. // Values should be aggregated per interface/node for every worker. +// ReduceSimpleCounterStatIndex can be used to reduce specific index. type SimpleCounterStat [][]Counter // CombinedCounterStat represents stat for CombinedCounterVector. // The outer array represents workers and the inner array represents interface/node/.. indexes. // Values should be aggregated per interface/node for every worker. +// ReduceCombinedCounterStatIndex can be used to reduce specific index. type CombinedCounterStat [][]CombinedCounter // NameStat represents stat for NameVector. type NameStat []Name -// Data represents some type of stat which is usually defined by StatType. -type Stat interface { - // isStat is unexported to limit implementations of Data interface to this package, - isStat() -} - func (ScalarStat) isStat() {} func (ErrorStat) isStat() {} func (SimpleCounterStat) isStat() {} func (CombinedCounterStat) isStat() {} func (NameStat) isStat() {} + +func (s ScalarStat) IsZero() bool { + return s == 0 +} +func (s ErrorStat) IsZero() bool { + return s == 0 +} +func (s SimpleCounterStat) IsZero() bool { + if s == nil { + return true + } + for _, ss := range s { + for _, sss := range ss { + if sss != 0 { + return false + } + } + } + return true +} +func (s CombinedCounterStat) IsZero() bool { + if s == nil { + return true + } + for _, ss := range s { + if ss == nil { + return true + } + for _, sss := range ss { + if sss[0] != 0 || sss[1] != 0 { + return false + } + } + } + return true +} +func (s NameStat) IsZero() bool { + if s == nil { + return true + } + for _, ss := range s { + if len(ss) > 0 { + return false + } + } + return true +} + +// ReduceSimpleCounterStatIndex returns reduced SimpleCounterStat s for index i. +func ReduceSimpleCounterStatIndex(s SimpleCounterStat, i int) uint64 { + var val uint64 + for _, w := range s { + val += uint64(w[i]) + } + return val +} + +// ReduceSimpleCounterStatIndex returns reduced CombinedCounterStat s for index i. +func ReduceCombinedCounterStatIndex(s CombinedCounterStat, i int) [2]uint64 { + var val [2]uint64 + for _, w := range s { + val[0] += uint64(w[i][0]) + val[1] += uint64(w[i][1]) + } + return val +} diff --git a/adapter/statsclient/stat_segment.go b/adapter/statsclient/stat_segment.go index e8d20b0..9f028eb 100644 --- a/adapter/statsclient/stat_segment.go +++ b/adapter/statsclient/stat_segment.go @@ -17,9 +17,7 @@ package statsclient import ( "fmt" "net" - "sync/atomic" "syscall" - "time" "unsafe" "github.com/ftrvxmtrx/fd" @@ -28,41 +26,70 @@ import ( ) var ( - maxWaitInProgress = time.Second * 1 + ErrStatDataLenIncorrect = fmt.Errorf("stat data length incorrect") ) -type statDirectoryType int32 - -func (t statDirectoryType) String() string { - return adapter.StatType(t).String() -} +const ( + minVersion = 0 + maxVersion = 1 +) -type statSegDirectoryEntry struct { - directoryType statDirectoryType - // unionData can represent: offset, index or value - unionData uint64 - offsetVector uint64 - name [128]byte +func checkVersion(ver uint64) error { + if ver < minVersion { + return fmt.Errorf("stat segment version is too old: %v (minimal version: %v)", ver, minVersion) + } else if ver > maxVersion { + return fmt.Errorf("stat segment version is not supported: %v (minimal version: %v)", ver, maxVersion) + } + return nil } type statSegment struct { sharedHeader []byte memorySize int64 - // oldHeader defines version 0 for stat segment - // and is used for VPP 19.04 - oldHeader bool + // legacyVersion represents stat segment version 0 + // and is used as fallback for VPP 19.04 + legacyVersion bool +} + +func (c *statSegment) getStatDirVector() unsafe.Pointer { + dirOffset, _, _ := c.getOffsets() + return unsafe.Pointer(&c.sharedHeader[dirOffset]) +} + +func (c *statSegment) getStatDirIndex(p unsafe.Pointer, index uint32) *statSegDirectoryEntry { + return (*statSegDirectoryEntry)(unsafe.Pointer(uintptr(p) + uintptr(index)*unsafe.Sizeof(statSegDirectoryEntry{}))) +} + +func (c *statSegment) getHeader() (header statSegSharedHeader) { + if c.legacyVersion { + return statSegHeaderLegacy(c.sharedHeader) + } + return statSegHeader(c.sharedHeader) +} + +func (c *statSegment) getEpoch() (int64, bool) { + h := c.getHeader() + return h.epoch, h.inProgress != 0 +} + +func (c *statSegment) getOffsets() (dir, err, stat int64) { + h := c.getHeader() + return h.directoryOffset, h.errorOffset, h.statsOffset } func (c *statSegment) connect(sockName string) error { - addr := &net.UnixAddr{ + if c.sharedHeader != nil { + return fmt.Errorf("already connected") + } + + addr := net.UnixAddr{ Net: "unixpacket", Name: sockName, } - Log.Debugf("connecting to: %v", addr) - conn, err := net.DialUnix(addr.Net, nil, addr) + conn, err := net.DialUnix(addr.Net, nil, &addr) if err != nil { Log.Warnf("connecting to socket %s failed: %s", addr, err) return err @@ -82,84 +109,102 @@ func (c *statSegment) connect(sockName string) error { if len(files) == 0 { return fmt.Errorf("no files received over socket") } + + file := files[0] defer func() { - for _, f := range files { - if err := f.Close(); err != nil { - Log.Warnf("closing file %s failed: %v", f.Name(), err) - } + if err := file.Close(); err != nil { + Log.Warnf("closing file failed: %v", err) } }() - Log.Debugf("received %d files over socket", len(files)) - - f := files[0] - - info, err := f.Stat() + info, err := file.Stat() if err != nil { return err } - size := info.Size() - Log.Debugf("fd: name=%v size=%v", info.Name(), size) - - data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) + data, err := syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { - Log.Warnf("mapping shared memory failed: %v", err) + Log.Debugf("mapping shared memory failed: %v", err) return fmt.Errorf("mapping shared memory failed: %v", err) } - Log.Debugf("successfuly mapped shared memory") - c.sharedHeader = data c.memorySize = size - header := c.readHeader() - Log.Debugf("stat segment header: %+v", header) + Log.Debugf("successfuly mmapped shared memory segment (size: %v)", size) - // older VPP (<=19.04) did not have version in stat segment header - // we try to provide fallback support by skipping it in header - if header.version > MaxVersion && header.inProgress > 1 && header.epoch == 0 { - h := c.readHeaderOld() - Log.Debugf("statsclient: falling back to old stat segment version (VPP <=19.04): %+v", h) - c.oldHeader = true + hdr := statSegHeader(c.sharedHeader) + Log.Debugf("stat segment header: %+v", hdr) + + if hdr.legacyVersion() { + c.legacyVersion = true + hdr = statSegHeaderLegacy(c.sharedHeader) + Log.Debugf("falling back to legacy version (VPP <=19.04) of stat segment (header: %+v)", hdr) + } + + if err := checkVersion(hdr.version); err != nil { + return err } return nil } func (c *statSegment) disconnect() error { + if c.sharedHeader == nil { + return nil + } + if err := syscall.Munmap(c.sharedHeader); err != nil { - Log.Warnf("unmapping shared memory failed: %v", err) + Log.Debugf("unmapping shared memory failed: %v", err) return fmt.Errorf("unmapping shared memory failed: %v", err) } + c.sharedHeader = nil Log.Debugf("successfuly unmapped shared memory") - return nil } -func (c *statSegment) copyData(dirEntry *statSegDirectoryEntry) adapter.Stat { - switch typ := adapter.StatType(dirEntry.directoryType); typ { +type statDirectoryType int32 + +func (t statDirectoryType) String() string { + return adapter.StatType(t).String() +} + +type statSegDirectoryEntry struct { + directoryType statDirectoryType + // unionData can represent: + // - offset + // - index + // - value + unionData uint64 + offsetVector uint64 + name [128]byte +} + +func (c *statSegment) copyEntryData(dirEntry *statSegDirectoryEntry) adapter.Stat { + dirType := adapter.StatType(dirEntry.directoryType) + + switch dirType { case adapter.ScalarIndex: return adapter.ScalarStat(dirEntry.unionData) case adapter.ErrorIndex: - _, errOffset, _ := c.readOffsets() + _, errOffset, _ := c.getOffsets() offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset]) var errData adapter.Counter - if c.oldHeader { + if c.legacyVersion { // error were not vector (per-worker) in VPP 19.04 offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0)) - val := *(*adapter.Counter)(add(offsetVector, offset)) + val := *(*adapter.Counter)(statSegPointer(offsetVector, offset)) errData = val } else { vecLen := vectorLen(offsetVector) for i := uint64(0); i < vecLen; i++ { - cb := *(*uint64)(add(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0)) - val := *(*adapter.Counter)(add(unsafe.Pointer(&c.sharedHeader[0]), offset)) + val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset)) errData += val } } @@ -167,83 +212,82 @@ func (c *statSegment) copyData(dirEntry *statSegDirectoryEntry) adapter.Stat { case adapter.SimpleCounterVector: if dirEntry.unionData == 0 { - Log.Debugf("\toffset is not valid") + debugf("offset invalid for %s", dirEntry.name) break } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - Log.Debugf("\toffset out of range") + debugf("offset out of range for %s", dirEntry.name) break } - simpleCounter := unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]) // offset - vecLen := vectorLen(simpleCounter) - offsetVector := add(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) data := make([][]adapter.Counter, vecLen) for i := uint64(0); i < vecLen; i++ { - cb := *(*uint64)(add(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) vecLen2 := vectorLen(counterVec) + data[i] = make([]adapter.Counter, vecLen2) for j := uint64(0); j < vecLen2; j++ { offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) - val := *(*adapter.Counter)(add(counterVec, offset)) - data[i] = append(data[i], val) + val := *(*adapter.Counter)(statSegPointer(counterVec, offset)) + data[i][j] = val } } return adapter.SimpleCounterStat(data) case adapter.CombinedCounterVector: if dirEntry.unionData == 0 { - Log.Debugf("\toffset is not valid") + debugf("offset invalid for %s", dirEntry.name) break } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - Log.Debugf("\toffset out of range") + debugf("offset out of range for %s", dirEntry.name) break } - combinedCounter := unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]) // offset - vecLen := vectorLen(combinedCounter) - offsetVector := add(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) data := make([][]adapter.CombinedCounter, vecLen) for i := uint64(0); i < vecLen; i++ { - cb := *(*uint64)(add(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) vecLen2 := vectorLen(counterVec) + data[i] = make([]adapter.CombinedCounter, vecLen2) for j := uint64(0); j < vecLen2; j++ { offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) - val := *(*adapter.CombinedCounter)(add(counterVec, offset)) - data[i] = append(data[i], val) + val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset)) + data[i][j] = val } } return adapter.CombinedCounterStat(data) case adapter.NameVector: if dirEntry.unionData == 0 { - Log.Debugf("\toffset is not valid") + debugf("offset invalid for %s", dirEntry.name) break } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - Log.Debugf("\toffset out of range") + debugf("offset out of range for %s", dirEntry.name) break } - nameVector := unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]) // offset - vecLen := vectorLen(nameVector) - offsetVector := add(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) data := make([]adapter.Name, vecLen) for i := uint64(0); i < vecLen; i++ { - cb := *(*uint64)(add(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) if cb == 0 { - Log.Debugf("\tname vector cb out of range") + debugf("name vector out of range for %s (%v)", dirEntry.name, i) continue } nameVec := unsafe.Pointer(&c.sharedHeader[cb]) vecLen2 := vectorLen(nameVec) - var nameStr []byte + nameStr := make([]byte, 0, vecLen2) for j := uint64(0); j < vecLen2; j++ { offset := uintptr(j) * unsafe.Sizeof(byte(0)) - val := *(*byte)(add(nameVec, offset)) + val := *(*byte)(statSegPointer(nameVec, offset)) if val > 0 { nameStr = append(nameStr, val) } @@ -253,116 +297,143 @@ func (c *statSegment) copyData(dirEntry *statSegDirectoryEntry) adapter.Stat { return adapter.NameStat(data) default: - Log.Warnf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name) + // TODO: monitor occurrences with metrics + debugf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name) } return nil } -type sharedHeaderBase struct { - epoch int64 - inProgress int64 - directoryOffset int64 - errorOffset int64 - statsOffset int64 -} +func (c *statSegment) updateEntryData(dirEntry *statSegDirectoryEntry, stat *adapter.Stat) error { + switch (*stat).(type) { + case adapter.ScalarStat: + *stat = adapter.ScalarStat(dirEntry.unionData) -type statSegSharedHeader struct { - version uint64 - sharedHeaderBase -} + case adapter.ErrorStat: + _, errOffset, _ := c.getOffsets() + offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset]) -func (c *statSegment) readHeaderOld() (header statSegSharedHeader) { - h := (*sharedHeaderBase)(unsafe.Pointer(&c.sharedHeader[0])) - header.version = 0 - header.epoch = atomic.LoadInt64(&h.epoch) - header.inProgress = atomic.LoadInt64(&h.inProgress) - header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) - header.errorOffset = atomic.LoadInt64(&h.errorOffset) - header.statsOffset = atomic.LoadInt64(&h.statsOffset) - return -} + var errData adapter.Counter + if c.legacyVersion { + // error were not vector (per-worker) in VPP 19.04 + offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0)) + val := *(*adapter.Counter)(statSegPointer(offsetVector, offset)) + errData = val + } else { + vecLen := vectorLen(offsetVector) + for i := uint64(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset)) + errData += val + } + } + *stat = adapter.ErrorStat(errData) -func (c *statSegment) readHeader() (header statSegSharedHeader) { - h := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - header.version = atomic.LoadUint64(&h.version) - header.epoch = atomic.LoadInt64(&h.epoch) - header.inProgress = atomic.LoadInt64(&h.inProgress) - header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) - header.errorOffset = atomic.LoadInt64(&h.errorOffset) - header.statsOffset = atomic.LoadInt64(&h.statsOffset) - return -} + case adapter.SimpleCounterStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } -func (c *statSegment) readVersion() uint64 { - if c.oldHeader { - return 0 - } - header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - version := atomic.LoadUint64(&header.version) - return version -} + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) -func (c *statSegment) readEpoch() (int64, bool) { - if c.oldHeader { - h := c.readHeaderOld() - return h.epoch, h.inProgress != 0 - } - header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - epoch := atomic.LoadInt64(&header.epoch) - inprog := atomic.LoadInt64(&header.inProgress) - return epoch, inprog != 0 -} + data := (*stat).(adapter.SimpleCounterStat) + if uint64(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint64(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) + vecLen2 := vectorLen(counterVec) + simpData := data[i] + if uint64(len(simpData)) != vecLen2 { + return ErrStatDataLenIncorrect + } + for j := uint64(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(counterVec, offset)) + simpData[j] = val + } + } -func (c *statSegment) readOffsets() (dir, err, stat int64) { - if c.oldHeader { - h := c.readHeaderOld() - return h.directoryOffset, h.errorOffset, h.statsOffset - } - header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - dirOffset := atomic.LoadInt64(&header.directoryOffset) - errOffset := atomic.LoadInt64(&header.errorOffset) - statOffset := atomic.LoadInt64(&header.statsOffset) - return dirOffset, errOffset, statOffset -} + case adapter.CombinedCounterStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } -type statSegAccess struct { - epoch int64 -} + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) -func (c *statSegment) accessStart() *statSegAccess { - epoch, inprog := c.readEpoch() - t := time.Now() - for inprog { - if time.Since(t) > maxWaitInProgress { - return nil + data := (*stat).(adapter.CombinedCounterStat) + if uint64(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint64(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) + vecLen2 := vectorLen(counterVec) + combData := data[i] + if uint64(len(combData)) != vecLen2 { + return ErrStatDataLenIncorrect + } + for j := uint64(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) + val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset)) + combData[j] = val + } } - epoch, inprog = c.readEpoch() - } - return &statSegAccess{ - epoch: epoch, - } -} -func (c *statSegment) accessEnd(acc *statSegAccess) bool { - epoch, inprog := c.readEpoch() - if acc.epoch != epoch || inprog { - return false - } - return true -} + case adapter.NameStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } -type vecHeader struct { - length uint64 - vectorData [0]uint8 -} + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) -func vectorLen(v unsafe.Pointer) uint64 { - vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uintptr(0)))) - return vec.length -} + data := (*stat).(adapter.NameStat) + if uint64(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint64(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + if cb == 0 { + continue + } + nameVec := unsafe.Pointer(&c.sharedHeader[cb]) + vecLen2 := vectorLen(nameVec) + + nameData := data[i] + if uint64(len(nameData))+1 != vecLen2 { + return ErrStatDataLenIncorrect + } + for j := uint64(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(byte(0)) + val := *(*byte)(statSegPointer(nameVec, offset)) + if val == 0 { + break + } + nameData[j] = val + } + } -//go:nosplit -func add(p unsafe.Pointer, x uintptr) unsafe.Pointer { - return unsafe.Pointer(uintptr(p) + x) + default: + if Debug { + Log.Debugf("Unrecognized stat type %T for stat entry: %v", stat, dirEntry.name) + } + } + return nil } diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go index 6381b9f..d4e5a56 100644 --- a/adapter/statsclient/statsclient.go +++ b/adapter/statsclient/statsclient.go @@ -20,7 +20,6 @@ import ( "fmt" "os" "regexp" - "unsafe" logger "github.com/sirupsen/logrus" @@ -63,11 +62,19 @@ func init() { } } +func debugf(f string, a ...interface{}) { + if Debug { + Log.Debugf(f, a...) + } +} + +// implements StatsAPI +var _ adapter.StatsAPI = (*StatsClient)(nil) + // StatsClient is the pure Go implementation for VPP stats API. type StatsClient struct { sockAddr string - currentEpoch int64 statSegment } @@ -94,13 +101,6 @@ func (c *StatsClient) Connect() error { return err } - ver := c.readVersion() - Log.Debugf("stat segment version: %v", ver) - - if err := checkVersion(ver); err != nil { - return err - } - return nil } @@ -108,116 +108,235 @@ func (c *StatsClient) Disconnect() error { if err := c.statSegment.disconnect(); err != nil { return err } - return nil } -func (c *StatsClient) ListStats(patterns ...string) (statNames []string, err error) { +func (c *StatsClient) ListStats(patterns ...string) (names []string, err error) { sa := c.accessStart() - if sa == nil { - return nil, fmt.Errorf("access failed") + if sa.epoch == 0 { + return nil, adapter.ErrStatsAccessFailed } - dirOffset, _, _ := c.readOffsets() - Log.Debugf("dirOffset: %v", dirOffset) + indexes, err := c.listIndexes(patterns...) + if err != nil { + return nil, err + } + for _, index := range indexes { + name, err := c.entryName(index) + if err != nil { + return nil, err + } + names = append(names, name) + } - vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirOffset])) - Log.Debugf("vecLen: %v", vecLen) - Log.Debugf("unsafe.Sizeof(statSegDirectoryEntry{}): %v", unsafe.Sizeof(statSegDirectoryEntry{})) + if !c.accessEnd(&sa) { + return nil, adapter.ErrStatsDataBusy + } - for i := uint64(0); i < vecLen; i++ { - offset := uintptr(i) * unsafe.Sizeof(statSegDirectoryEntry{}) - dirEntry := (*statSegDirectoryEntry)(add(unsafe.Pointer(&c.sharedHeader[dirOffset]), offset)) + return names, nil +} - nul := bytes.IndexByte(dirEntry.name[:], '\x00') - if nul < 0 { - Log.Debugf("no zero byte found for: %q", dirEntry.name[:]) - continue - } - name := string(dirEntry.name[:nul]) - if name == "" { - Log.Debugf("entry with empty name found (%d)", i) - continue - } +func (c *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) { + sa := c.accessStart() + if sa.epoch == 0 { + return nil, adapter.ErrStatsAccessFailed + } - Log.Debugf(" %80q (type: %v, data: %d, offset: %d) ", name, dirEntry.directoryType, dirEntry.unionData, dirEntry.offsetVector) + dir, err := c.listIndexes(patterns...) + if err != nil { + return nil, err + } + if entries, err = c.dumpEntries(dir); err != nil { + return nil, err + } - if nameMatches(name, patterns) { - statNames = append(statNames, name) - } + if !c.accessEnd(&sa) { + return nil, adapter.ErrStatsDataBusy + } + + return entries, nil +} + +func (c *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) { + dir := new(adapter.StatDir) - // TODO: copy the listed entries elsewhere + sa := c.accessStart() + if sa.epoch == 0 { + return nil, adapter.ErrStatsAccessFailed } - if !c.accessEnd(sa) { - return nil, adapter.ErrStatDirBusy + indexes, err := c.listIndexes(patterns...) + if err != nil { + return nil, err } + dir.Indexes = indexes - c.currentEpoch = sa.epoch + entries, err := c.dumpEntries(indexes) + if err != nil { + return nil, err + } + dir.Entries = entries - return statNames, nil + if !c.accessEnd(&sa) { + return nil, adapter.ErrStatsDataBusy + } + dir.Epoch = sa.epoch + + return dir, nil } -func (c *StatsClient) DumpStats(patterns ...string) (entries []*adapter.StatEntry, err error) { - epoch, _ := c.readEpoch() - if c.currentEpoch > 0 && c.currentEpoch != epoch { // TODO: do list stats before dump - return nil, fmt.Errorf("old data") +func (c *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { + epoch, _ := c.getEpoch() + if dir.Epoch != epoch { + return adapter.ErrStatsDirStale } sa := c.accessStart() - if sa == nil { - return nil, fmt.Errorf("access failed") + if sa.epoch == 0 { + return adapter.ErrStatsAccessFailed } - dirOffset, _, _ := c.readOffsets() - vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirOffset])) + dirVector := c.getStatDirVector() - for i := uint64(0); i < vecLen; i++ { - offset := uintptr(i) * unsafe.Sizeof(statSegDirectoryEntry{}) - dirEntry := (*statSegDirectoryEntry)(add(unsafe.Pointer(&c.sharedHeader[dirOffset]), offset)) + for i, index := range dir.Indexes { + dirEntry := c.getStatDirIndex(dirVector, index) - nul := bytes.IndexByte(dirEntry.name[:], '\x00') - if nul < 0 { - Log.Debugf("no zero byte found for: %q", dirEntry.name[:]) + var name []byte + for n := 0; n < len(dirEntry.name); n++ { + if dirEntry.name[n] == 0 { + name = dirEntry.name[:n] + break + } + } + if len(name) == 0 { + continue + } + + entry := &dir.Entries[i] + if !bytes.Equal(name, entry.Name) { + continue + } + if adapter.StatType(dirEntry.directoryType) != entry.Type { continue } - name := string(dirEntry.name[:nul]) - if name == "" { - Log.Debugf("entry with empty name found (%d)", i) + if entry.Data == nil { continue } + if err := c.updateEntryData(dirEntry, &entry.Data); err != nil { + return fmt.Errorf("updating stat data for entry %s failed: %v", name, err) + } - Log.Debugf(" - %s (type: %v, data: %v, offset: %v) ", name, dirEntry.directoryType, dirEntry.unionData, dirEntry.offsetVector) + } - entry := adapter.StatEntry{ - Name: name, - Type: adapter.StatType(dirEntry.directoryType), - Data: c.copyData(dirEntry), - } + if !c.accessEnd(&sa) { + return adapter.ErrStatsDataBusy + } - Log.Debugf("\tentry data: %+v %#v (%T)", entry.Data, entry.Data, entry.Data) + return nil +} - if nameMatches(entry.Name, patterns) { - entries = append(entries, &entry) +// listIndexes lists indexes for all stat entries that match any of the regex patterns. +func (c *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) { + if len(patterns) == 0 { + return c.listIndexesFunc(nil) + } + var regexes = make([]*regexp.Regexp, len(patterns)) + for i, pattern := range patterns { + r, err := regexp.Compile(pattern) + if err != nil { + return nil, fmt.Errorf("compiling regexp failed: %v", err) + } + regexes[i] = r + } + nameMatches := func(name []byte) bool { + for _, r := range regexes { + if r.Match(name) { + return true + } } + return false } + return c.listIndexesFunc(nameMatches) +} - if !c.accessEnd(sa) { - return nil, adapter.ErrStatDumpBusy +func (c *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) { + if f == nil { + // there is around ~3150 stats, so to avoid too many allocations + // we set capacity to 3200 when listing all stats + indexes = make([]uint32, 0, 3200) } - return entries, nil + dirVector := c.getStatDirVector() + vecLen := uint32(vectorLen(dirVector)) + + for i := uint32(0); i < vecLen; i++ { + dirEntry := c.getStatDirIndex(dirVector, i) + + if f != nil { + var name []byte + for n := 0; n < len(dirEntry.name); n++ { + if dirEntry.name[n] == 0 { + name = dirEntry.name[:n] + break + } + } + if len(name) == 0 || !f(name) { + continue + } + } + indexes = append(indexes, i) + } + + return indexes, nil } -func nameMatches(name string, patterns []string) bool { - if len(patterns) == 0 { - return true +func (c *StatsClient) entryName(index uint32) (string, error) { + dirVector := c.getStatDirVector() + vecLen := uint32(vectorLen(dirVector)) + + if index >= vecLen { + return "", fmt.Errorf("stat entry index %d out of range (%d)", index, vecLen) } - for _, pattern := range patterns { - matched, err := regexp.MatchString(pattern, name) - if err == nil && matched { - return true + + dirEntry := c.getStatDirIndex(dirVector, index) + + var name []byte + for n := 0; n < len(dirEntry.name); n++ { + if dirEntry.name[n] == 0 { + name = dirEntry.name[:n] + break } } - return false + + return string(name), nil +} + +func (c *StatsClient) dumpEntries(indexes []uint32) (entries []adapter.StatEntry, err error) { + entries = make([]adapter.StatEntry, 0, len(indexes)) + + dirVector := c.getStatDirVector() + + for _, index := range indexes { + dirEntry := c.getStatDirIndex(dirVector, index) + + var name []byte + for n := 0; n < len(dirEntry.name); n++ { + if dirEntry.name[n] == 0 { + name = dirEntry.name[:n] + break + } + } + if len(name) == 0 { + continue + } + + entry := adapter.StatEntry{ + Name: append([]byte(nil), name...), + Type: adapter.StatType(dirEntry.directoryType), + Data: c.copyEntryData(dirEntry), + } + entries = append(entries, entry) + } + + return entries, nil } diff --git a/adapter/statsclient/statseg.go b/adapter/statsclient/statseg.go new file mode 100644 index 0000000..7f1c381 --- /dev/null +++ b/adapter/statsclient/statseg.go @@ -0,0 +1,100 @@ +package statsclient + +import ( + "sync/atomic" + "time" + "unsafe" +) + +var ( + MaxWaitInProgress = time.Millisecond * 100 + CheckDelayInProgress = time.Microsecond * 10 +) + +type sharedHeaderBase struct { + epoch int64 + inProgress int64 + directoryOffset int64 + errorOffset int64 + statsOffset int64 +} + +type statSegSharedHeader struct { + version uint64 + sharedHeaderBase +} + +func (h *statSegSharedHeader) legacyVersion() bool { + // older VPP (<=19.04) did not have version in stat segment header + // we try to provide fallback support by skipping it in header + if h.version > maxVersion && h.inProgress > 1 && h.epoch == 0 { + return true + } + return false +} + +func statSegHeader(b []byte) (header statSegSharedHeader) { + h := (*statSegSharedHeader)(unsafe.Pointer(&b[0])) + header.version = atomic.LoadUint64(&h.version) + header.epoch = atomic.LoadInt64(&h.epoch) + header.inProgress = atomic.LoadInt64(&h.inProgress) + header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) + header.errorOffset = atomic.LoadInt64(&h.errorOffset) + header.statsOffset = atomic.LoadInt64(&h.statsOffset) + return +} + +func statSegHeaderLegacy(b []byte) (header statSegSharedHeader) { + h := (*sharedHeaderBase)(unsafe.Pointer(&b[0])) + header.version = 0 + header.epoch = atomic.LoadInt64(&h.epoch) + header.inProgress = atomic.LoadInt64(&h.inProgress) + header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) + header.errorOffset = atomic.LoadInt64(&h.errorOffset) + header.statsOffset = atomic.LoadInt64(&h.statsOffset) + return +} + +type statSegAccess struct { + epoch int64 +} + +func (c *statSegment) accessStart() statSegAccess { + t := time.Now() + + epoch, inprog := c.getEpoch() + for inprog { + if time.Since(t) > MaxWaitInProgress { + return statSegAccess{} + } else { + time.Sleep(CheckDelayInProgress) + } + epoch, inprog = c.getEpoch() + } + return statSegAccess{ + epoch: epoch, + } +} + +func (c *statSegment) accessEnd(acc *statSegAccess) bool { + epoch, inprog := c.getEpoch() + if acc.epoch != epoch || inprog { + return false + } + return true +} + +type vecHeader struct { + length uint64 + vectorData [0]uint8 +} + +func vectorLen(v unsafe.Pointer) uint64 { + vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uintptr(0)))) + return vec.length +} + +//go:nosplit +func statSegPointer(p unsafe.Pointer, offset uintptr) unsafe.Pointer { + return unsafe.Pointer(uintptr(p) + offset) +} diff --git a/adapter/statsclient/version.go b/adapter/statsclient/version.go deleted file mode 100644 index a289faa..0000000 --- a/adapter/statsclient/version.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) 2019 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package statsclient - -import ( - "fmt" -) - -const ( - MinVersion = 0 - MaxVersion = 1 -) - -func checkVersion(ver uint64) error { - if ver < MinVersion { - return fmt.Errorf("stat segment version is too old: %v (minimal version: %v)", ver, MinVersion) - } else if ver > MaxVersion { - return fmt.Errorf("stat segment version is not supported: %v (minimal version: %v)", ver, MaxVersion) - } - return nil -} diff --git a/adapter/vppapiclient/stat_client.go b/adapter/vppapiclient/stat_client.go index 0ab088c..bf19c45 100644 --- a/adapter/vppapiclient/stat_client.go +++ b/adapter/vppapiclient/stat_client.go @@ -86,7 +86,7 @@ func (c *statClient) Disconnect() error { func (c *statClient) ListStats(patterns ...string) (stats []string, err error) { dir := C.govpp_stat_segment_ls(convertStringSlice(patterns)) if dir == nil { - return nil, adapter.ErrStatDirBusy + return nil, adapter.ErrStatsDataBusy } defer C.govpp_stat_segment_vec_free(unsafe.Pointer(dir)) @@ -100,16 +100,16 @@ func (c *statClient) ListStats(patterns ...string) (stats []string, err error) { return stats, nil } -func (c *statClient) DumpStats(patterns ...string) (stats []*adapter.StatEntry, err error) { +func (c *statClient) DumpStats(patterns ...string) (stats []adapter.StatEntry, err error) { dir := C.govpp_stat_segment_ls(convertStringSlice(patterns)) if dir == nil { - return nil, adapter.ErrStatDirBusy + return nil, adapter.ErrStatsDataBusy } defer C.govpp_stat_segment_vec_free(unsafe.Pointer(dir)) dump := C.govpp_stat_segment_dump(dir) if dump == nil { - return nil, adapter.ErrStatDumpBusy + return nil, adapter.ErrStatsDataBusy } defer C.govpp_stat_segment_data_free(dump) @@ -120,8 +120,8 @@ func (c *statClient) DumpStats(patterns ...string) (stats []*adapter.StatEntry, name := C.GoString(nameChar) typ := adapter.StatType(C.govpp_stat_segment_data_type(&v)) - stat := &adapter.StatEntry{ - Name: name, + stat := adapter.StatEntry{ + Name: []byte(name), Type: typ, } @@ -147,10 +147,10 @@ func (c *statClient) DumpStats(patterns ...string) (stats []*adapter.StatEntry, vector := make([][]adapter.CombinedCounter, length) for k := 0; k < length; k++ { for j := 0; j < int(C.govpp_stat_segment_vec_len(unsafe.Pointer(C.govpp_stat_segment_data_get_combined_counter_index(&v, C.int(k))))); j++ { - vector[k] = append(vector[k], adapter.CombinedCounter{ - Packets: adapter.Counter(C.govpp_stat_segment_data_get_combined_counter_index_packets(&v, C.int(k), C.int(j))), - Bytes: adapter.Counter(C.govpp_stat_segment_data_get_combined_counter_index_bytes(&v, C.int(k), C.int(j))), - }) + vector[k] = append(vector[k], adapter.CombinedCounter([2]uint64{ + uint64(C.govpp_stat_segment_data_get_combined_counter_index_packets(&v, C.int(k), C.int(j))), + uint64(C.govpp_stat_segment_data_get_combined_counter_index_bytes(&v, C.int(k), C.int(j))), + })) } } stat.Data = adapter.CombinedCounterStat(vector) @@ -180,6 +180,14 @@ func (c *statClient) DumpStats(patterns ...string) (stats []*adapter.StatEntry, return stats, nil } +func (c *statClient) PrepareDir(prefixes ...string) (*adapter.StatDir, error) { + return nil, adapter.ErrNotImplemented +} + +func (c *statClient) UpdateDir(dir *adapter.StatDir) error { + return adapter.ErrNotImplemented +} + func convertStringSlice(strs []string) **C.uint8_t { var arr **C.uint8_t for _, str := range strs { diff --git a/adapter/vppapiclient/stat_client_stub.go b/adapter/vppapiclient/stat_client_stub.go index 57792f3..c764391 100644 --- a/adapter/vppapiclient/stat_client_stub.go +++ b/adapter/vppapiclient/stat_client_stub.go @@ -40,6 +40,14 @@ func (*stubStatClient) ListStats(patterns ...string) (statNames []string, err er return nil, adapter.ErrNotImplemented } -func (*stubStatClient) DumpStats(patterns ...string) ([]*adapter.StatEntry, error) { +func (*stubStatClient) DumpStats(patterns ...string) ([]adapter.StatEntry, error) { return nil, adapter.ErrNotImplemented } + +func (*stubStatClient) PrepareDir(prefixes ...string) (*adapter.StatDir, error) { + return nil, adapter.ErrNotImplemented +} + +func (*stubStatClient) UpdateDir(dir *adapter.StatDir) error { + return adapter.ErrNotImplemented +} diff --git a/api/stats.go b/api/stats.go index e254eae..2850b5f 100644 --- a/api/stats.go +++ b/api/stats.go @@ -14,13 +14,24 @@ package api +// StatsProvider provides methods for retrieving statistics. +type StatsProvider interface { + GetSystemStats(*SystemStats) error + GetNodeStats(*NodeStats) error + GetInterfaceStats(*InterfaceStats) error + GetErrorStats(*ErrorStats) error + GetBufferStats(*BufferStats) error +} + // SystemStats represents global system statistics. type SystemStats struct { - VectorRate float64 - InputRate float64 - LastUpdate float64 - LastStatsClear float64 - Heartbeat float64 + VectorRate uint64 + NumWorkerThreads uint64 + VectorRatePerWorker []uint64 + InputRate uint64 + LastUpdate uint64 + LastStatsClear uint64 + Heartbeat uint64 } // NodeStats represents per node statistics. @@ -49,19 +60,18 @@ type InterfaceCounters struct { InterfaceIndex uint32 InterfaceName string // requires VPP 19.04+ - RxPackets uint64 - RxBytes uint64 - RxErrors uint64 - TxPackets uint64 - TxBytes uint64 - TxErrors uint64 + Rx InterfaceCounterCombined + Tx InterfaceCounterCombined - RxUnicast [2]uint64 // packets[0], bytes[1] - RxMulticast [2]uint64 // packets[0], bytes[1] - RxBroadcast [2]uint64 // packets[0], bytes[1] - TxUnicastMiss [2]uint64 // packets[0], bytes[1] - TxMulticast [2]uint64 // packets[0], bytes[1] - TxBroadcast [2]uint64 // packets[0], bytes[1] + RxErrors uint64 + TxErrors uint64 + + RxUnicast InterfaceCounterCombined + RxMulticast InterfaceCounterCombined + RxBroadcast InterfaceCounterCombined + TxUnicast InterfaceCounterCombined + TxMulticast InterfaceCounterCombined + TxBroadcast InterfaceCounterCombined Drops uint64 Punts uint64 @@ -69,6 +79,13 @@ type InterfaceCounters struct { IP6 uint64 RxNoBuf uint64 RxMiss uint64 + Mpls uint64 +} + +// InterfaceCounterCombined defines combined counters for interfaces. +type InterfaceCounterCombined struct { + Packets uint64 + Bytes uint64 } // ErrorStats represents statistics per error counter. @@ -79,7 +96,8 @@ type ErrorStats struct { // ErrorCounter represents error counter. type ErrorCounter struct { CounterName string - Value uint64 + + Value uint64 } // BufferStats represents statistics per buffer pool. @@ -89,17 +107,9 @@ type BufferStats struct { // BufferPool represents buffer pool. type BufferPool struct { - PoolName string + PoolName string + Cached float64 Used float64 Available float64 } - -// StatsProvider provides the methods for getting statistics. -type StatsProvider interface { - GetSystemStats() (*SystemStats, error) - GetNodeStats() (*NodeStats, error) - GetInterfaceStats() (*InterfaceStats, error) - GetErrorStats(names ...string) (*ErrorStats, error) - GetBufferStats() (*BufferStats, error) -} diff --git a/core/connection.go b/core/connection.go index 8b8c7b1..6f82616 100644 --- a/core/connection.go +++ b/core/connection.go @@ -89,7 +89,6 @@ type ConnectionEvent struct { // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { vppClient adapter.VppAPI // VPP binary API client - //statsClient adapter.StatsAPI // VPP stats API client maxAttempts int // interval for reconnect attempts recInterval time.Duration // maximum number of reconnect attempts @@ -177,7 +176,9 @@ func (c *Connection) connectVPP() error { log.Debugf("Connected to VPP") if err := c.retrieveMessageIDs(); err != nil { - c.vppClient.Disconnect() + if err := c.vppClient.Disconnect(); err != nil { + log.Debugf("disconnecting vpp client failed: %v", err) + } return fmt.Errorf("VPP is incompatible: %v", err) } @@ -192,7 +193,6 @@ func (c *Connection) Disconnect() { if c == nil { return } - if c.vppClient != nil { c.disconnectVPP() } diff --git a/core/stats.go b/core/stats.go index e935888..23b3848 100644 --- a/core/stats.go +++ b/core/stats.go @@ -1,22 +1,29 @@ package core import ( - "fmt" "path" "strings" "sync/atomic" + "time" "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/api" ) +var ( + RetryUpdateCount = 10 + RetryUpdateDelay = time.Millisecond * 10 +) + const ( - SystemStatsPrefix = "/sys/" - SystemStats_VectorRate = SystemStatsPrefix + "vector_rate" - SystemStats_InputRate = SystemStatsPrefix + "input_rate" - SystemStats_LastUpdate = SystemStatsPrefix + "last_update" - SystemStats_LastStatsClear = SystemStatsPrefix + "last_stats_clear" - SystemStats_Heartbeat = SystemStatsPrefix + "heartbeat" + SystemStatsPrefix = "/sys/" + SystemStats_VectorRate = SystemStatsPrefix + "vector_rate" + SystemStats_NumWorkerThreads = SystemStatsPrefix + "num_worker_threads" + SystemStats_VectorRatePerWorker = SystemStatsPrefix + "vector_rate_per_worker" + SystemStats_InputRate = SystemStatsPrefix + "input_rate" + SystemStats_LastUpdate = SystemStatsPrefix + "last_update" + SystemStats_LastStatsClear = SystemStatsPrefix + "last_stats_clear" + SystemStats_Heartbeat = SystemStatsPrefix + "heartbeat" NodeStatsPrefix = "/sys/node/" NodeStats_Names = NodeStatsPrefix + "names" @@ -42,11 +49,13 @@ const ( InterfaceStats_RxMiss = InterfaceStatsPrefix + "rx-miss" InterfaceStats_RxError = InterfaceStatsPrefix + "rx-error" InterfaceStats_TxError = InterfaceStatsPrefix + "tx-error" + InterfaceStats_Mpls = InterfaceStatsPrefix + "mpls" InterfaceStats_Rx = InterfaceStatsPrefix + "rx" InterfaceStats_RxUnicast = InterfaceStatsPrefix + "rx-unicast" InterfaceStats_RxMulticast = InterfaceStatsPrefix + "rx-multicast" InterfaceStats_RxBroadcast = InterfaceStatsPrefix + "rx-broadcast" InterfaceStats_Tx = InterfaceStatsPrefix + "tx" + InterfaceStats_TxUnicast = InterfaceStatsPrefix + "tx-unicast" InterfaceStats_TxUnicastMiss = InterfaceStatsPrefix + "tx-unicast-miss" InterfaceStats_TxMulticast = InterfaceStatsPrefix + "tx-multicast" InterfaceStats_TxBroadcast = InterfaceStatsPrefix + "tx-broadcast" @@ -63,7 +72,14 @@ const ( type StatsConnection struct { statsClient adapter.StatsAPI - connected uint32 // non-zero if the adapter is connected to VPP + // connected is true if the adapter is connected to VPP + connected uint32 + + errorStatsData *adapter.StatDir + nodeStatsData *adapter.StatDir + ifaceStatsData *adapter.StatDir + sysStatsData *adapter.StatDir + bufStatsData *adapter.StatDir } func newStatsConnection(stats adapter.StatsAPI) *StatsConnection { @@ -105,7 +121,6 @@ func (c *StatsConnection) Disconnect() { if c == nil { return } - if c.statsClient != nil { c.disconnectClient() } @@ -113,301 +128,314 @@ func (c *StatsConnection) Disconnect() { func (c *StatsConnection) disconnectClient() { if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { - c.statsClient.Disconnect() + if err := c.statsClient.Disconnect(); err != nil { + log.Debugf("disconnecting stats client failed: %v", err) + } } } -// GetSystemStats retrieves VPP system stats. -func (c *StatsConnection) GetSystemStats() (*api.SystemStats, error) { - stats, err := c.statsClient.DumpStats(SystemStatsPrefix) - if err != nil { - return nil, err +func (c *StatsConnection) updateStats(statDir **adapter.StatDir, patterns ...string) error { + if statDir == nil { + panic("statDir must not nil") } + try := func() error { + if (*statDir) == nil { + dir, err := c.statsClient.PrepareDir(patterns...) + if err != nil { + log.Debugln("preparing dir failed:", err) + return err + } + *statDir = dir + } else { + if err := c.statsClient.UpdateDir(*statDir); err != nil { + log.Debugln("updating dir failed:", err) + *statDir = nil + return err + } + } - sysStats := &api.SystemStats{} + return nil + } + var err error + for r := 0; r < RetryUpdateCount; r++ { + if err = try(); err == nil { + if r > 0 { + log.Debugf("retry successfull (r=%d)", r) + } + return nil + } else if err == adapter.ErrStatsDirStale || err == adapter.ErrStatsDataBusy { + // retrying + if r > 1 { + log.Debugln("sleeping for %v before next try", RetryUpdateDelay) + time.Sleep(RetryUpdateDelay) + } + } else { + // error is not retryable + break + } + } + return err +} - for _, stat := range stats { - switch stat.Name { +// UpdateSystemStats retrieves VPP system stats. +func (c *StatsConnection) GetSystemStats(sysStats *api.SystemStats) (err error) { + if err := c.updateStats(&c.sysStatsData, SystemStatsPrefix); err != nil { + return err + } + + for _, stat := range c.sysStatsData.Entries { + var val uint64 + if s, ok := stat.Data.(adapter.ScalarStat); ok { + val = uint64(s) + } + switch string(stat.Name) { case SystemStats_VectorRate: - sysStats.VectorRate = scalarStatToFloat64(stat.Data) + sysStats.VectorRate = val + case SystemStats_NumWorkerThreads: + sysStats.NumWorkerThreads = val + case SystemStats_VectorRatePerWorker: + var vals []uint64 + if ss, ok := stat.Data.(adapter.SimpleCounterStat); ok { + vals = make([]uint64, len(ss)) + for w := range ss { + vals[w] = uint64(ss[w][0]) + } + } + sysStats.VectorRatePerWorker = vals case SystemStats_InputRate: - sysStats.InputRate = scalarStatToFloat64(stat.Data) + sysStats.InputRate = val case SystemStats_LastUpdate: - sysStats.LastUpdate = scalarStatToFloat64(stat.Data) + sysStats.LastUpdate = val case SystemStats_LastStatsClear: - sysStats.LastStatsClear = scalarStatToFloat64(stat.Data) + sysStats.LastStatsClear = val case SystemStats_Heartbeat: - sysStats.Heartbeat = scalarStatToFloat64(stat.Data) + sysStats.Heartbeat = val } } - return sysStats, nil + return nil } // GetErrorStats retrieves VPP error stats. -func (c *StatsConnection) GetErrorStats(names ...string) (*api.ErrorStats, error) { - var patterns []string - if len(names) > 0 { - patterns = make([]string, len(names)) - for i, name := range names { - patterns[i] = CounterStatsPrefix + name - } - } else { - // retrieve all error counters by default - patterns = []string{CounterStatsPrefix} +func (c *StatsConnection) GetErrorStats(errorStats *api.ErrorStats) (err error) { + if err := c.updateStats(&c.errorStatsData, CounterStatsPrefix); err != nil { + return err } - stats, err := c.statsClient.DumpStats(patterns...) - if err != nil { - return nil, err + + if errorStats.Errors == nil || len(errorStats.Errors) != len(c.errorStatsData.Entries) { + errorStats.Errors = make([]api.ErrorCounter, len(c.errorStatsData.Entries)) + for i := 0; i < len(c.errorStatsData.Entries); i++ { + errorStats.Errors[i].CounterName = string(c.errorStatsData.Entries[i].Name) + } } - var errorStats = &api.ErrorStats{} - - for _, stat := range stats { - statName := strings.TrimPrefix(stat.Name, CounterStatsPrefix) - - /* TODO: deal with stats that contain '/' in node/counter name - parts := strings.Split(statName, "/") - var nodeName, counterName string - switch len(parts) { - case 2: - nodeName = parts[0] - counterName = parts[1] - case 3: - nodeName = parts[0] + parts[1] - counterName = parts[2] - }*/ - - errorStats.Errors = append(errorStats.Errors, api.ErrorCounter{ - CounterName: statName, - Value: errorStatToUint64(stat.Data), - }) + for i, stat := range c.errorStatsData.Entries { + if stat.Type != adapter.ErrorIndex { + continue + } + if errStat, ok := stat.Data.(adapter.ErrorStat); ok { + errorStats.Errors[i].Value = uint64(errStat) + } } - return errorStats, nil + return nil } -// GetNodeStats retrieves VPP per node stats. -func (c *StatsConnection) GetNodeStats() (*api.NodeStats, error) { - stats, err := c.statsClient.DumpStats(NodeStatsPrefix) - if err != nil { - return nil, err +func (c *StatsConnection) GetNodeStats(nodeStats *api.NodeStats) (err error) { + if err := c.updateStats(&c.nodeStatsData, NodeStatsPrefix); err != nil { + return err } - nodeStats := &api.NodeStats{} - - var setPerNode = func(perNode []uint64, fn func(c *api.NodeCounters, v uint64)) { - if nodeStats.Nodes == nil { - nodeStats.Nodes = make([]api.NodeCounters, len(perNode)) - for i := range perNode { + prepNodes := func(l int) { + if nodeStats.Nodes == nil || len(nodeStats.Nodes) != l { + nodeStats.Nodes = make([]api.NodeCounters, l) + for i := 0; i < l; i++ { nodeStats.Nodes[i].NodeIndex = uint32(i) } } - for i, v := range perNode { - if len(nodeStats.Nodes) <= i { - break - } - nodeCounters := nodeStats.Nodes[i] - fn(&nodeCounters, v) - nodeStats.Nodes[i] = nodeCounters + } + perNode := func(stat adapter.StatEntry, fn func(*api.NodeCounters, uint64)) { + s := stat.Data.(adapter.SimpleCounterStat) + prepNodes(len(s[0])) + for i := range nodeStats.Nodes { + val := adapter.ReduceSimpleCounterStatIndex(s, i) + fn(&nodeStats.Nodes[i], val) } } - for _, stat := range stats { - switch stat.Name { + for _, stat := range c.nodeStatsData.Entries { + switch string(stat.Name) { case NodeStats_Names: - if names, ok := stat.Data.(adapter.NameStat); !ok { - return nil, fmt.Errorf("invalid stat type for %s", stat.Name) - } else { - if nodeStats.Nodes == nil { - nodeStats.Nodes = make([]api.NodeCounters, len(names)) - for i := range names { - nodeStats.Nodes[i].NodeIndex = uint32(i) - } - } - for i, name := range names { - nodeStats.Nodes[i].NodeName = string(name) + stat := stat.Data.(adapter.NameStat) + prepNodes(len(stat)) + for i, nc := range nodeStats.Nodes { + if nc.NodeName != string(stat[i]) { + nc.NodeName = string(stat[i]) + nodeStats.Nodes[i] = nc } } case NodeStats_Clocks: - setPerNode(reduceSimpleCounterStat(stat.Data), func(c *api.NodeCounters, v uint64) { - c.Clocks = v + perNode(stat, func(node *api.NodeCounters, val uint64) { + node.Clocks = val }) case NodeStats_Vectors: - setPerNode(reduceSimpleCounterStat(stat.Data), func(c *api.NodeCounters, v uint64) { - c.Vectors = v + perNode(stat, func(node *api.NodeCounters, val uint64) { + node.Vectors = val }) case NodeStats_Calls: - setPerNode(reduceSimpleCounterStat(stat.Data), func(c *api.NodeCounters, v uint64) { - c.Calls = v + perNode(stat, func(node *api.NodeCounters, val uint64) { + node.Calls = val }) case NodeStats_Suspends: - setPerNode(reduceSimpleCounterStat(stat.Data), func(c *api.NodeCounters, v uint64) { - c.Suspends = v + perNode(stat, func(node *api.NodeCounters, val uint64) { + node.Suspends = val }) } } - return nodeStats, nil + return nil } // GetInterfaceStats retrieves VPP per interface stats. -func (c *StatsConnection) GetInterfaceStats() (*api.InterfaceStats, error) { - stats, err := c.statsClient.DumpStats(InterfaceStatsPrefix) - if err != nil { - return nil, err +func (c *StatsConnection) GetInterfaceStats(ifaceStats *api.InterfaceStats) (err error) { + if err := c.updateStats(&c.ifaceStatsData, InterfaceStatsPrefix); err != nil { + return err } - ifStats := &api.InterfaceStats{} - - var setPerIf = func(perIf []uint64, fn func(c *api.InterfaceCounters, v uint64)) { - if ifStats.Interfaces == nil { - ifStats.Interfaces = make([]api.InterfaceCounters, len(perIf)) - for i := range perIf { - ifStats.Interfaces[i].InterfaceIndex = uint32(i) + prep := func(l int) { + if ifaceStats.Interfaces == nil || len(ifaceStats.Interfaces) != l { + ifaceStats.Interfaces = make([]api.InterfaceCounters, l) + for i := 0; i < l; i++ { + ifaceStats.Interfaces[i].InterfaceIndex = uint32(i) } } - for i, v := range perIf { - if len(ifStats.Interfaces) <= i { - break - } - ifCounters := ifStats.Interfaces[i] - fn(&ifCounters, v) - ifStats.Interfaces[i] = ifCounters + } + perNode := func(stat adapter.StatEntry, fn func(*api.InterfaceCounters, uint64)) { + s := stat.Data.(adapter.SimpleCounterStat) + prep(len(s[0])) + for i := range ifaceStats.Interfaces { + val := adapter.ReduceSimpleCounterStatIndex(s, i) + fn(&ifaceStats.Interfaces[i], val) + } + } + perNodeComb := func(stat adapter.StatEntry, fn func(*api.InterfaceCounters, [2]uint64)) { + s := stat.Data.(adapter.CombinedCounterStat) + prep(len(s[0])) + for i := range ifaceStats.Interfaces { + val := adapter.ReduceCombinedCounterStatIndex(s, i) + fn(&ifaceStats.Interfaces[i], val) } } - for _, stat := range stats { - switch stat.Name { + for _, stat := range c.ifaceStatsData.Entries { + switch string(stat.Name) { case InterfaceStats_Names: - if names, ok := stat.Data.(adapter.NameStat); !ok { - return nil, fmt.Errorf("invalid stat type for %s", stat.Name) - } else { - if ifStats.Interfaces == nil { - ifStats.Interfaces = make([]api.InterfaceCounters, len(names)) - for i := range names { - ifStats.Interfaces[i].InterfaceIndex = uint32(i) - } - } - for i, name := range names { - ifStats.Interfaces[i].InterfaceName = string(name) + stat := stat.Data.(adapter.NameStat) + prep(len(stat)) + for i, nc := range ifaceStats.Interfaces { + if nc.InterfaceName != string(stat[i]) { + nc.InterfaceName = string(stat[i]) + ifaceStats.Interfaces[i] = nc } } case InterfaceStats_Drops: - setPerIf(reduceSimpleCounterStat(stat.Data), func(c *api.InterfaceCounters, v uint64) { - c.Drops = v + perNode(stat, func(iface *api.InterfaceCounters, val uint64) { + iface.Drops = val }) case InterfaceStats_Punt: - setPerIf(reduceSimpleCounterStat(stat.Data), func(c *api.InterfaceCounters, v uint64) { - c.Punts = v + perNode(stat, func(iface *api.InterfaceCounters, val uint64) { + iface.Punts = val }) case InterfaceStats_IP4: - setPerIf(reduceSimpleCounterStat(stat.Data), func(c *api.InterfaceCounters, v uint64) { - c.IP4 = v + perNode(stat, func(iface *api.InterfaceCounters, val uint64) { + iface.IP4 = val }) case InterfaceStats_IP6: - setPerIf(reduceSimpleCounterStat(stat.Data), func(c *api.InterfaceCounters, v uint64) { - c.IP6 = v + perNode(stat, func(iface *api.InterfaceCounters, val uint64) { + iface.IP6 = val }) case InterfaceStats_RxNoBuf: - setPerIf(reduceSimpleCounterStat(stat.Data), func(c *api.InterfaceCounters, v uint64) { - c.RxNoBuf = v + perNode(stat, func(iface *api.InterfaceCounters, val uint64) { + iface.RxNoBuf = val }) case InterfaceStats_RxMiss: - setPerIf(reduceSimpleCounterStat(stat.Data), func(c *api.InterfaceCounters, v uint64) { - c.RxMiss = v + perNode(stat, func(iface *api.InterfaceCounters, val uint64) { + iface.RxMiss = val }) case InterfaceStats_RxError: - setPerIf(reduceSimpleCounterStat(stat.Data), func(c *api.InterfaceCounters, v uint64) { - c.RxErrors = v + perNode(stat, func(iface *api.InterfaceCounters, val uint64) { + iface.RxErrors = val }) case InterfaceStats_TxError: - setPerIf(reduceSimpleCounterStat(stat.Data), func(c *api.InterfaceCounters, v uint64) { - c.TxErrors = v + perNode(stat, func(iface *api.InterfaceCounters, val uint64) { + iface.TxErrors = val }) - case InterfaceStats_Rx: - per := reduceCombinedCounterStat(stat.Data) - setPerIf(per[0], func(c *api.InterfaceCounters, v uint64) { - c.RxPackets = v + case InterfaceStats_Mpls: + perNode(stat, func(iface *api.InterfaceCounters, val uint64) { + iface.Mpls = val }) - setPerIf(per[1], func(c *api.InterfaceCounters, v uint64) { - c.RxBytes = v + case InterfaceStats_Rx: + perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) { + iface.Rx.Packets = val[0] + iface.Rx.Bytes = val[1] }) case InterfaceStats_RxUnicast: - per := reduceCombinedCounterStat(stat.Data) - setPerIf(per[0], func(c *api.InterfaceCounters, v uint64) { - c.RxUnicast[0] = v - }) - setPerIf(per[1], func(c *api.InterfaceCounters, v uint64) { - c.RxUnicast[1] = v + perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) { + iface.RxUnicast.Packets = val[0] + iface.RxUnicast.Bytes = val[1] }) case InterfaceStats_RxMulticast: - per := reduceCombinedCounterStat(stat.Data) - setPerIf(per[0], func(c *api.InterfaceCounters, v uint64) { - c.RxMulticast[0] = v - }) - setPerIf(per[1], func(c *api.InterfaceCounters, v uint64) { - c.RxMulticast[1] = v + perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) { + iface.RxMulticast.Packets = val[0] + iface.RxMulticast.Bytes = val[1] }) case InterfaceStats_RxBroadcast: - per := reduceCombinedCounterStat(stat.Data) - setPerIf(per[0], func(c *api.InterfaceCounters, v uint64) { - c.RxBroadcast[0] = v - }) - setPerIf(per[1], func(c *api.InterfaceCounters, v uint64) { - c.RxBroadcast[1] = v + perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) { + iface.RxBroadcast.Packets = val[0] + iface.RxBroadcast.Bytes = val[1] }) case InterfaceStats_Tx: - per := reduceCombinedCounterStat(stat.Data) - setPerIf(per[0], func(c *api.InterfaceCounters, v uint64) { - c.TxPackets = v - }) - setPerIf(per[1], func(c *api.InterfaceCounters, v uint64) { - c.TxBytes = v + perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) { + iface.Tx.Packets = val[0] + iface.Tx.Bytes = val[1] }) case InterfaceStats_TxUnicastMiss: - per := reduceCombinedCounterStat(stat.Data) - setPerIf(per[0], func(c *api.InterfaceCounters, v uint64) { - c.TxUnicastMiss[0] = v - }) - setPerIf(per[1], func(c *api.InterfaceCounters, v uint64) { - c.TxUnicastMiss[1] = v + // tx-unicast-miss was a spelling mistake in older versions + // + fallthrough + case InterfaceStats_TxUnicast: + perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) { + iface.TxUnicast.Packets = val[0] + iface.TxUnicast.Bytes = val[1] }) case InterfaceStats_TxMulticast: - per := reduceCombinedCounterStat(stat.Data) - setPerIf(per[0], func(c *api.InterfaceCounters, v uint64) { - c.TxMulticast[0] = v - }) - setPerIf(per[1], func(c *api.InterfaceCounters, v uint64) { - c.TxMulticast[1] = v + perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) { + iface.TxMulticast.Packets = val[0] + iface.TxMulticast.Bytes = val[1] }) case InterfaceStats_TxBroadcast: - per := reduceCombinedCounterStat(stat.Data) - setPerIf(per[0], func(c *api.InterfaceCounters, v uint64) { - c.TxBroadcast[0] = v - }) - setPerIf(per[1], func(c *api.InterfaceCounters, v uint64) { - c.TxBroadcast[1] = v + perNodeComb(stat, func(iface *api.InterfaceCounters, val [2]uint64) { + iface.TxBroadcast.Packets = val[0] + iface.TxBroadcast.Bytes = val[1] }) } } - return ifStats, nil + return nil } // GetBufferStats retrieves VPP buffer pools stats. -func (c *StatsConnection) GetBufferStats() (*api.BufferStats, error) { - stats, err := c.statsClient.DumpStats(BufferStatsPrefix) - if err != nil { - return nil, err +func (c *StatsConnection) GetBufferStats(bufStats *api.BufferStats) (err error) { + if err := c.updateStats(&c.bufStatsData, BufferStatsPrefix); err != nil { + return err } - bufStats := &api.BufferStats{ - Buffer: map[string]api.BufferPool{}, + if bufStats.Buffer == nil { + bufStats.Buffer = make(map[string]api.BufferPool) } - for _, stat := range stats { - d, f := path.Split(stat.Name) + for _, stat := range c.bufStatsData.Entries { + d, f := path.Split(string(stat.Name)) d = strings.TrimSuffix(d, "/") name := strings.TrimPrefix(d, BufferStatsPrefix) @@ -416,65 +444,22 @@ func (c *StatsConnection) GetBufferStats() (*api.BufferStats, error) { b.PoolName = name } + var val float64 + s, ok := stat.Data.(adapter.ScalarStat) + if ok { + val = float64(s) + } switch f { case BufferStats_Cached: - b.Cached = scalarStatToFloat64(stat.Data) + b.Cached = val case BufferStats_Used: - b.Used = scalarStatToFloat64(stat.Data) + b.Used = val case BufferStats_Available: - b.Available = scalarStatToFloat64(stat.Data) + b.Available = val } bufStats.Buffer[name] = b } - return bufStats, nil -} - -func scalarStatToFloat64(stat adapter.Stat) float64 { - if s, ok := stat.(adapter.ScalarStat); ok { - return float64(s) - } - return 0 -} - -func errorStatToUint64(stat adapter.Stat) uint64 { - if s, ok := stat.(adapter.ErrorStat); ok { - return uint64(s) - } - return 0 -} - -func reduceSimpleCounterStat(stat adapter.Stat) []uint64 { - if s, ok := stat.(adapter.SimpleCounterStat); ok { - if len(s) == 0 { - return []uint64{} - } - var per = make([]uint64, len(s[0])) - for _, w := range s { - for i, n := range w { - per[i] += uint64(n) - } - } - return per - } return nil } - -func reduceCombinedCounterStat(stat adapter.Stat) [2][]uint64 { - if s, ok := stat.(adapter.CombinedCounterStat); ok { - if len(s) == 0 { - return [2][]uint64{{}, {}} - } - var perPackets = make([]uint64, len(s[0])) - var perBytes = make([]uint64, len(s[0])) - for _, w := range s { - for i, n := range w { - perPackets[i] += uint64(n.Packets) - perBytes[i] += uint64(n.Bytes) - } - } - return [2][]uint64{perPackets, perBytes} - } - return [2][]uint64{} -} diff --git a/examples/perf-bench/perf-bench.go b/examples/perf-bench/perf-bench.go index b246e6c..f48c154 100644 --- a/examples/perf-bench/perf-bench.go +++ b/examples/perf-bench/perf-bench.go @@ -25,9 +25,8 @@ import ( "github.com/pkg/profile" "github.com/sirupsen/logrus" - "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/adapter/socketclient" - "git.fd.io/govpp.git/adapter/vppapiclient" + "git.fd.io/govpp.git/adapter/statsclient" "git.fd.io/govpp.git/api" "git.fd.io/govpp.git/core" "git.fd.io/govpp.git/examples/binapi/vpe" @@ -40,10 +39,12 @@ const ( func main() { // parse optional flags - var sync, prof, sock bool + var sync, prof bool var cnt int + var sock string flag.BoolVar(&sync, "sync", false, "run synchronous perf test") - flag.BoolVar(&sock, "sock", false, "use socket client for VPP API") + flag.StringVar(&sock, "socket", socketclient.DefaultSocketName, "Path to VPP API socket") + flag.String("socket", statsclient.DefaultSocketName, "Path to VPP stats socket") flag.IntVar(&cnt, "count", 0, "count of requests to be sent to VPP") flag.BoolVar(&prof, "prof", false, "generate profile data") flag.Parse() @@ -61,12 +62,7 @@ func main() { defer profile.Start().Stop() } - var a adapter.VppAPI - if sock { - a = socketclient.NewVppClient("/run/vpp-api.sock") - } else { - a = vppapiclient.NewVppClient("") - } + a := socketclient.NewVppClient(sock) // connect to VPP conn, err := core.Connect(a) diff --git a/examples/stats-api/README.md b/examples/stats-client/README.md index f3d33b1..0a44a55 100644 --- a/examples/stats-api/README.md +++ b/examples/stats-client/README.md @@ -1,4 +1,4 @@ -# Stats API Example +# Stats Client Example This example demonstrates how to retrieve statistics from VPP using [the new Stats API](https://github.com/FDio/vpp/blob/master/src/vpp/stats/stats.md). diff --git a/examples/stats-api/stats_api.go b/examples/stats-client/stats_api.go index 175bb27..288caea 100644 --- a/examples/stats-api/stats_api.go +++ b/examples/stats-client/stats_api.go @@ -20,10 +20,11 @@ import ( "log" "os" "strings" + "time" "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/adapter/statsclient" - "git.fd.io/govpp.git/adapter/vppapiclient" + "git.fd.io/govpp.git/api" "git.fd.io/govpp.git/core" ) @@ -37,12 +38,12 @@ import ( var ( statsSocket = flag.String("socket", statsclient.DefaultSocketName, "Path to VPP stats socket") dumpAll = flag.Bool("all", false, "Dump all stats including ones with zero values") - oldclient = flag.Bool("oldclient", false, "Use old client for stats API (vppapiclient)") + pollPeriod = flag.Duration("period", time.Second*5, "Polling interval period") ) func init() { flag.Usage = func() { - fmt.Fprintf(os.Stderr, "%s: usage [ls|dump|errors|interfaces|nodes|system|buffers] <patterns>...\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "%s: usage [ls|dump|poll|errors|interfaces|nodes|system|buffers] <patterns>...\n", os.Args[0]) flag.PrintDefaults() os.Exit(1) } @@ -52,26 +53,12 @@ func main() { flag.Parse() skipZeros := !*dumpAll - cmd := flag.Arg(0) - switch cmd { - case "", "ls", "dump", "errors", "interfaces", "nodes", "system", "buffers": - default: - flag.Usage() - } - var patterns []string if flag.NArg() > 0 { patterns = flag.Args()[1:] } - var client adapter.StatsAPI - if *oldclient { - client = vppapiclient.NewStatClient(*statsSocket) - } else { - client = statsclient.NewStatsClient(*statsSocket) - } - - fmt.Printf("Connecting to stats socket: %s\n", *statsSocket) + client := statsclient.NewStatsClient(*statsSocket) c, err := core.ConnectStats(client) if err != nil { @@ -79,22 +66,26 @@ func main() { } defer c.Disconnect() - switch cmd { + switch cmd := flag.Arg(0); cmd { case "system": - stats, err := c.GetSystemStats() - if err != nil { + stats := new(api.SystemStats) + if err := c.GetSystemStats(stats); err != nil { log.Fatalln("getting system stats failed:", err) } fmt.Printf("System stats: %+v\n", stats) + case "poll-system": + pollSystem(c) + case "nodes": fmt.Println("Listing node stats..") - stats, err := c.GetNodeStats() - if err != nil { + stats := new(api.NodeStats) + if err := c.GetNodeStats(stats); err != nil { log.Fatalln("getting node stats failed:", err) } + for _, node := range stats.Nodes { - if node.Calls == 0 && node.Suspends == 0 && node.Clocks == 0 && node.Vectors == 0 && skipZeros { + if skipZeros && node.Calls == 0 && node.Suspends == 0 && node.Clocks == 0 && node.Vectors == 0 { continue } fmt.Printf(" - %+v\n", node) @@ -103,8 +94,8 @@ func main() { case "interfaces": fmt.Println("Listing interface stats..") - stats, err := c.GetInterfaceStats() - if err != nil { + stats := new(api.InterfaceStats) + if err := c.GetInterfaceStats(stats); err != nil { log.Fatalln("getting interface stats failed:", err) } for _, iface := range stats.Interfaces { @@ -112,15 +103,18 @@ func main() { } fmt.Printf("Listed %d interface counters\n", len(stats.Interfaces)) + case "poll-interfaces": + pollInterfaces(c) + case "errors": fmt.Printf("Listing error stats.. %s\n", strings.Join(patterns, " ")) - stats, err := c.GetErrorStats(patterns...) - if err != nil { + stats := new(api.ErrorStats) + if err := c.GetErrorStats(stats); err != nil { log.Fatalln("getting error stats failed:", err) } n := 0 for _, counter := range stats.Errors { - if counter.Value == 0 && skipZeros { + if skipZeros && counter.Value == 0 { continue } fmt.Printf(" - %v\n", counter) @@ -129,23 +123,33 @@ func main() { fmt.Printf("Listed %d (%d) error counters\n", n, len(stats.Errors)) case "buffers": - stats, err := c.GetBufferStats() - if err != nil { + stats := new(api.BufferStats) + if err := c.GetBufferStats(stats); err != nil { log.Fatalln("getting buffer stats failed:", err) } fmt.Printf("Buffer stats: %+v\n", stats) case "dump": + fmt.Printf("Dumping stats.. %s\n", strings.Join(patterns, " ")) + dumpStats(client, patterns, skipZeros) - default: + case "poll": + fmt.Printf("Polling stats.. %s\n", strings.Join(patterns, " ")) + + pollStats(client, patterns, skipZeros) + + case "list", "ls", "": + fmt.Printf("Listing stats.. %s\n", strings.Join(patterns, " ")) + listStats(client, patterns) + + default: + fmt.Printf("invalid command: %q\n", cmd) } } func listStats(client adapter.StatsAPI, patterns []string) { - fmt.Printf("Listing stats.. %s\n", strings.Join(patterns, " ")) - list, err := client.ListStats(patterns...) if err != nil { log.Fatalln("listing stats failed:", err) @@ -159,8 +163,6 @@ func listStats(client adapter.StatsAPI, patterns []string) { } func dumpStats(client adapter.StatsAPI, patterns []string, skipZeros bool) { - fmt.Printf("Dumping stats.. %s\n", strings.Join(patterns, " ")) - stats, err := client.DumpStats(patterns...) if err != nil { log.Fatalln("dumping stats failed:", err) @@ -168,40 +170,91 @@ func dumpStats(client adapter.StatsAPI, patterns []string, skipZeros bool) { n := 0 for _, stat := range stats { - if isZero(stat.Data) && skipZeros { + if skipZeros && (stat.Data == nil || stat.Data.IsZero()) { continue } - fmt.Printf(" - %-25s %25v %+v\n", stat.Name, stat.Type, stat.Data) + fmt.Printf(" - %-50s %25v %+v\n", stat.Name, stat.Type, stat.Data) n++ } fmt.Printf("Dumped %d (%d) stats\n", n, len(stats)) } -func isZero(stat adapter.Stat) bool { - switch s := stat.(type) { - case adapter.ScalarStat: - return s == 0 - case adapter.ErrorStat: - return s == 0 - case adapter.SimpleCounterStat: - for _, ss := range s { - for _, sss := range ss { - if sss != 0 { - return false - } +func pollStats(client adapter.StatsAPI, patterns []string, skipZeros bool) { + dir, err := client.PrepareDir(patterns...) + if err != nil { + log.Fatalln("preparing dir failed:", err) + } + + tick := time.Tick(*pollPeriod) + for { + n := 0 + fmt.Println(time.Now().Format(time.Stamp)) + for _, stat := range dir.Entries { + if skipZeros && (stat.Data == nil || stat.Data.IsZero()) { + continue } + fmt.Printf("%-50s %+v\n", stat.Name, stat.Data) + n++ } - return true - case adapter.CombinedCounterStat: - for _, ss := range s { - for _, sss := range ss { - if sss.Bytes != 0 || sss.Packets != 0 { - return false + fmt.Println() + + select { + case <-tick: + if err := client.UpdateDir(dir); err != nil { + if err == adapter.ErrStatsDirStale { + if dir, err = client.PrepareDir(patterns...); err != nil { + log.Fatalln("preparing dir failed:", err) + } + continue } + log.Fatalln("updating dir failed:", err) + } + } + } +} + +func pollSystem(client api.StatsProvider) { + stats := new(api.SystemStats) + + if err := client.GetSystemStats(stats); err != nil { + log.Fatalln("updating system stats failed:", err) + } + + tick := time.Tick(*pollPeriod) + for { + fmt.Printf("System stats: %+v\n", stats) + fmt.Println() + + select { + case <-tick: + if err := client.GetSystemStats(stats); err != nil { + log.Println("updating system stats failed:", err) + } + } + } +} + +func pollInterfaces(client api.StatsProvider) { + stats := new(api.InterfaceStats) + + if err := client.GetInterfaceStats(stats); err != nil { + log.Fatalln("updating system stats failed:", err) + } + + tick := time.Tick(*pollPeriod) + for { + fmt.Printf("Interface stats (%d interfaces)\n", len(stats.Interfaces)) + for i := range stats.Interfaces { + fmt.Printf(" - %+v\n", stats.Interfaces[i]) + } + fmt.Println() + + select { + case <-tick: + if err := client.GetInterfaceStats(stats); err != nil { + log.Println("updating system stats failed:", err) } } - return true } - return false } diff --git a/test/integration/stats_integration_test.go b/test/integration/stats_integration_test.go new file mode 100644 index 0000000..51405d9 --- /dev/null +++ b/test/integration/stats_integration_test.go @@ -0,0 +1,175 @@ +// Copyright (c) 2019 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build integration + +package integration + +import ( + "flag" + "testing" + + "git.fd.io/govpp.git/adapter/statsclient" + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/core" +) + +var ( + statsSocket = flag.String("socket", statsclient.DefaultSocketName, "Path to VPP stats socket") +) + +func TestStatClientAll(t *testing.T) { + client := statsclient.NewStatsClient(*statsSocket) + + c, err := core.ConnectStats(client) + if err != nil { + t.Fatal("Connecting failed:", err) + } + defer c.Disconnect() + + sysStats := new(api.SystemStats) + nodeStats := new(api.NodeStats) + errorStats := new(api.ErrorStats) + ifaceStats := new(api.InterfaceStats) + + if err = c.GetNodeStats(nodeStats); err != nil { + t.Fatal("updating node stats failed:", err) + } + if err = c.GetSystemStats(sysStats); err != nil { + t.Fatal("updating system stats failed:", err) + } + if err = c.GetErrorStats(errorStats); err != nil { + t.Fatal("updating error stats failed:", err) + } + if err = c.GetInterfaceStats(ifaceStats); err != nil { + t.Fatal("updating interface stats failed:", err) + } +} + +func TestStatClientNodeStats(t *testing.T) { + client := statsclient.NewStatsClient(*statsSocket) + + c, err := core.ConnectStats(client) + if err != nil { + t.Fatal("Connecting failed:", err) + } + defer c.Disconnect() + + stats := new(api.NodeStats) + + if err := c.GetNodeStats(stats); err != nil { + t.Fatal("getting node stats failed:", err) + } +} + +func TestStatClientNodeStatsAgain(t *testing.T) { + client := statsclient.NewStatsClient(*statsSocket) + c, err := core.ConnectStats(client) + if err != nil { + t.Fatal("Connecting failed:", err) + } + defer c.Disconnect() + + stats := new(api.NodeStats) + + if err := c.GetNodeStats(stats); err != nil { + t.Fatal("getting node stats failed:", err) + } + if err := c.GetNodeStats(stats); err != nil { + t.Fatal("getting node stats failed:", err) + } +} + +func BenchmarkStatClientNodeStatsGet1(b *testing.B) { benchStatClientNodeStatsGet(b, 1) } +func BenchmarkStatClientNodeStatsGet10(b *testing.B) { benchStatClientNodeStatsGet(b, 10) } + +func benchStatClientNodeStatsGet(b *testing.B, repeatN int) { + client := statsclient.NewStatsClient(*statsSocket) + c, err := core.ConnectStats(client) + if err != nil { + b.Fatal("Connecting failed:", err) + } + defer c.Disconnect() + + b.ResetTimer() + nodeStats := new(api.NodeStats) + for i := 0; i < b.N; i++ { + for r := 0; r < repeatN; r++ { + if err = c.GetNodeStats(nodeStats); err != nil { + b.Fatal("getting node stats failed:", err) + } + } + } + b.StopTimer() +} + +func BenchmarkStatClientNodeStatsUpdate1(b *testing.B) { benchStatClientNodeStatsLoad(b, 1) } +func BenchmarkStatClientNodeStatsUpdate10(b *testing.B) { benchStatClientNodeStatsLoad(b, 10) } + +func benchStatClientNodeStatsLoad(b *testing.B, repeatN int) { + client := statsclient.NewStatsClient(*statsSocket) + c, err := core.ConnectStats(client) + if err != nil { + b.Fatal("Connecting failed:", err) + } + defer c.Disconnect() + nodeStats := new(api.NodeStats) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for r := 0; r < repeatN; r++ { + if err = c.GetNodeStats(nodeStats); err != nil { + b.Fatal("getting node stats failed:", err) + } + } + } + b.StopTimer() +} + +func BenchmarkStatClientStatsUpdate1(b *testing.B) { benchStatClientStatsUpdate(b, 1) } +func BenchmarkStatClientStatsUpdate10(b *testing.B) { benchStatClientStatsUpdate(b, 10) } +func BenchmarkStatClientStatsUpdate100(b *testing.B) { benchStatClientStatsUpdate(b, 100) } + +func benchStatClientStatsUpdate(b *testing.B, repeatN int) { + client := statsclient.NewStatsClient(*statsSocket) + c, err := core.ConnectStats(client) + if err != nil { + b.Fatal("Connecting failed:", err) + } + defer c.Disconnect() + + sysStats := new(api.SystemStats) + nodeStats := new(api.NodeStats) + errorStats := new(api.ErrorStats) + ifaceStats := new(api.InterfaceStats) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for r := 0; r < repeatN; r++ { + if err = c.GetNodeStats(nodeStats); err != nil { + b.Fatal("updating node stats failed:", err) + } + if err = c.GetSystemStats(sysStats); err != nil { + b.Fatal("updating system stats failed:", err) + } + if err = c.GetErrorStats(errorStats); err != nil { + b.Fatal("updating error stats failed:", err) + } + if err = c.GetInterfaceStats(ifaceStats); err != nil { + b.Fatal("updating error stats failed:", err) + } + } + } + b.StopTimer() +} |