diff options
author | Ondrej Fabry <ofabry@cisco.com> | 2019-09-17 12:41:47 +0200 |
---|---|---|
committer | Ondrej Fabry <ofabry@cisco.com> | 2019-10-03 12:54:22 +0200 |
commit | 809b69ea4a90455445c34bbad7d8e5fea5cf3462 (patch) | |
tree | ba4b94339ac83908e2d9933692dc373929380aa7 /adapter/statsclient | |
parent | 48198748bdfcc7d30c794cdac19de822da53f840 (diff) |
Optimizations for statsclient
- this dramatically improves performance for stats data collection
- memory allocation is now done only when stat dirs change
- updating prepared stat dir does not need to allocate memory
- created integration test for testing stats client
- added NumWorkerThreads and VectorRatePerWorker to SystemStats
- added ReduceSimpleCounterStatIndex, ReduceCombinedCounterStatIndex for
aggregating specific index
Change-Id: I702731a69024ab5dd0832bb5cfe2773a987359e5
Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
Diffstat (limited to 'adapter/statsclient')
-rw-r--r-- | adapter/statsclient/stat_segment.go | 415 | ||||
-rw-r--r-- | adapter/statsclient/statsclient.go | 273 | ||||
-rw-r--r-- | adapter/statsclient/statseg.go | 100 | ||||
-rw-r--r-- | adapter/statsclient/version.go | 33 |
4 files changed, 539 insertions, 282 deletions
diff --git a/adapter/statsclient/stat_segment.go b/adapter/statsclient/stat_segment.go index e8d20b0..9f028eb 100644 --- a/adapter/statsclient/stat_segment.go +++ b/adapter/statsclient/stat_segment.go @@ -17,9 +17,7 @@ package statsclient import ( "fmt" "net" - "sync/atomic" "syscall" - "time" "unsafe" "github.com/ftrvxmtrx/fd" @@ -28,41 +26,70 @@ import ( ) var ( - maxWaitInProgress = time.Second * 1 + ErrStatDataLenIncorrect = fmt.Errorf("stat data length incorrect") ) -type statDirectoryType int32 - -func (t statDirectoryType) String() string { - return adapter.StatType(t).String() -} +const ( + minVersion = 0 + maxVersion = 1 +) -type statSegDirectoryEntry struct { - directoryType statDirectoryType - // unionData can represent: offset, index or value - unionData uint64 - offsetVector uint64 - name [128]byte +func checkVersion(ver uint64) error { + if ver < minVersion { + return fmt.Errorf("stat segment version is too old: %v (minimal version: %v)", ver, minVersion) + } else if ver > maxVersion { + return fmt.Errorf("stat segment version is not supported: %v (minimal version: %v)", ver, maxVersion) + } + return nil } type statSegment struct { sharedHeader []byte memorySize int64 - // oldHeader defines version 0 for stat segment - // and is used for VPP 19.04 - oldHeader bool + // legacyVersion represents stat segment version 0 + // and is used as fallback for VPP 19.04 + legacyVersion bool +} + +func (c *statSegment) getStatDirVector() unsafe.Pointer { + dirOffset, _, _ := c.getOffsets() + return unsafe.Pointer(&c.sharedHeader[dirOffset]) +} + +func (c *statSegment) getStatDirIndex(p unsafe.Pointer, index uint32) *statSegDirectoryEntry { + return (*statSegDirectoryEntry)(unsafe.Pointer(uintptr(p) + uintptr(index)*unsafe.Sizeof(statSegDirectoryEntry{}))) +} + +func (c *statSegment) getHeader() (header statSegSharedHeader) { + if c.legacyVersion { + return statSegHeaderLegacy(c.sharedHeader) + } + return statSegHeader(c.sharedHeader) +} + +func (c *statSegment) getEpoch() (int64, bool) { + h := c.getHeader() + return h.epoch, h.inProgress != 0 +} + +func (c *statSegment) getOffsets() (dir, err, stat int64) { + h := c.getHeader() + return h.directoryOffset, h.errorOffset, h.statsOffset } func (c *statSegment) connect(sockName string) error { - addr := &net.UnixAddr{ + if c.sharedHeader != nil { + return fmt.Errorf("already connected") + } + + addr := net.UnixAddr{ Net: "unixpacket", Name: sockName, } - Log.Debugf("connecting to: %v", addr) - conn, err := net.DialUnix(addr.Net, nil, addr) + conn, err := net.DialUnix(addr.Net, nil, &addr) if err != nil { Log.Warnf("connecting to socket %s failed: %s", addr, err) return err @@ -82,84 +109,102 @@ func (c *statSegment) connect(sockName string) error { if len(files) == 0 { return fmt.Errorf("no files received over socket") } + + file := files[0] defer func() { - for _, f := range files { - if err := f.Close(); err != nil { - Log.Warnf("closing file %s failed: %v", f.Name(), err) - } + if err := file.Close(); err != nil { + Log.Warnf("closing file failed: %v", err) } }() - Log.Debugf("received %d files over socket", len(files)) - - f := files[0] - - info, err := f.Stat() + info, err := file.Stat() if err != nil { return err } - size := info.Size() - Log.Debugf("fd: name=%v size=%v", info.Name(), size) - - data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) + data, err := syscall.Mmap(int(file.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { - Log.Warnf("mapping shared memory failed: %v", err) + Log.Debugf("mapping shared memory failed: %v", err) return fmt.Errorf("mapping shared memory failed: %v", err) } - Log.Debugf("successfuly mapped shared memory") - c.sharedHeader = data c.memorySize = size - header := c.readHeader() - Log.Debugf("stat segment header: %+v", header) + Log.Debugf("successfuly mmapped shared memory segment (size: %v)", size) - // older VPP (<=19.04) did not have version in stat segment header - // we try to provide fallback support by skipping it in header - if header.version > MaxVersion && header.inProgress > 1 && header.epoch == 0 { - h := c.readHeaderOld() - Log.Debugf("statsclient: falling back to old stat segment version (VPP <=19.04): %+v", h) - c.oldHeader = true + hdr := statSegHeader(c.sharedHeader) + Log.Debugf("stat segment header: %+v", hdr) + + if hdr.legacyVersion() { + c.legacyVersion = true + hdr = statSegHeaderLegacy(c.sharedHeader) + Log.Debugf("falling back to legacy version (VPP <=19.04) of stat segment (header: %+v)", hdr) + } + + if err := checkVersion(hdr.version); err != nil { + return err } return nil } func (c *statSegment) disconnect() error { + if c.sharedHeader == nil { + return nil + } + if err := syscall.Munmap(c.sharedHeader); err != nil { - Log.Warnf("unmapping shared memory failed: %v", err) + Log.Debugf("unmapping shared memory failed: %v", err) return fmt.Errorf("unmapping shared memory failed: %v", err) } + c.sharedHeader = nil Log.Debugf("successfuly unmapped shared memory") - return nil } -func (c *statSegment) copyData(dirEntry *statSegDirectoryEntry) adapter.Stat { - switch typ := adapter.StatType(dirEntry.directoryType); typ { +type statDirectoryType int32 + +func (t statDirectoryType) String() string { + return adapter.StatType(t).String() +} + +type statSegDirectoryEntry struct { + directoryType statDirectoryType + // unionData can represent: + // - offset + // - index + // - value + unionData uint64 + offsetVector uint64 + name [128]byte +} + +func (c *statSegment) copyEntryData(dirEntry *statSegDirectoryEntry) adapter.Stat { + dirType := adapter.StatType(dirEntry.directoryType) + + switch dirType { case adapter.ScalarIndex: return adapter.ScalarStat(dirEntry.unionData) case adapter.ErrorIndex: - _, errOffset, _ := c.readOffsets() + _, errOffset, _ := c.getOffsets() offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset]) var errData adapter.Counter - if c.oldHeader { + if c.legacyVersion { // error were not vector (per-worker) in VPP 19.04 offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0)) - val := *(*adapter.Counter)(add(offsetVector, offset)) + val := *(*adapter.Counter)(statSegPointer(offsetVector, offset)) errData = val } else { vecLen := vectorLen(offsetVector) for i := uint64(0); i < vecLen; i++ { - cb := *(*uint64)(add(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0)) - val := *(*adapter.Counter)(add(unsafe.Pointer(&c.sharedHeader[0]), offset)) + val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset)) errData += val } } @@ -167,83 +212,82 @@ func (c *statSegment) copyData(dirEntry *statSegDirectoryEntry) adapter.Stat { case adapter.SimpleCounterVector: if dirEntry.unionData == 0 { - Log.Debugf("\toffset is not valid") + debugf("offset invalid for %s", dirEntry.name) break } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - Log.Debugf("\toffset out of range") + debugf("offset out of range for %s", dirEntry.name) break } - simpleCounter := unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]) // offset - vecLen := vectorLen(simpleCounter) - offsetVector := add(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) data := make([][]adapter.Counter, vecLen) for i := uint64(0); i < vecLen; i++ { - cb := *(*uint64)(add(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) vecLen2 := vectorLen(counterVec) + data[i] = make([]adapter.Counter, vecLen2) for j := uint64(0); j < vecLen2; j++ { offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) - val := *(*adapter.Counter)(add(counterVec, offset)) - data[i] = append(data[i], val) + val := *(*adapter.Counter)(statSegPointer(counterVec, offset)) + data[i][j] = val } } return adapter.SimpleCounterStat(data) case adapter.CombinedCounterVector: if dirEntry.unionData == 0 { - Log.Debugf("\toffset is not valid") + debugf("offset invalid for %s", dirEntry.name) break } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - Log.Debugf("\toffset out of range") + debugf("offset out of range for %s", dirEntry.name) break } - combinedCounter := unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]) // offset - vecLen := vectorLen(combinedCounter) - offsetVector := add(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) data := make([][]adapter.CombinedCounter, vecLen) for i := uint64(0); i < vecLen; i++ { - cb := *(*uint64)(add(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) vecLen2 := vectorLen(counterVec) + data[i] = make([]adapter.CombinedCounter, vecLen2) for j := uint64(0); j < vecLen2; j++ { offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) - val := *(*adapter.CombinedCounter)(add(counterVec, offset)) - data[i] = append(data[i], val) + val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset)) + data[i][j] = val } } return adapter.CombinedCounterStat(data) case adapter.NameVector: if dirEntry.unionData == 0 { - Log.Debugf("\toffset is not valid") + debugf("offset invalid for %s", dirEntry.name) break } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { - Log.Debugf("\toffset out of range") + debugf("offset out of range for %s", dirEntry.name) break } - nameVector := unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]) // offset - vecLen := vectorLen(nameVector) - offsetVector := add(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) data := make([]adapter.Name, vecLen) for i := uint64(0); i < vecLen; i++ { - cb := *(*uint64)(add(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) if cb == 0 { - Log.Debugf("\tname vector cb out of range") + debugf("name vector out of range for %s (%v)", dirEntry.name, i) continue } nameVec := unsafe.Pointer(&c.sharedHeader[cb]) vecLen2 := vectorLen(nameVec) - var nameStr []byte + nameStr := make([]byte, 0, vecLen2) for j := uint64(0); j < vecLen2; j++ { offset := uintptr(j) * unsafe.Sizeof(byte(0)) - val := *(*byte)(add(nameVec, offset)) + val := *(*byte)(statSegPointer(nameVec, offset)) if val > 0 { nameStr = append(nameStr, val) } @@ -253,116 +297,143 @@ func (c *statSegment) copyData(dirEntry *statSegDirectoryEntry) adapter.Stat { return adapter.NameStat(data) default: - Log.Warnf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name) + // TODO: monitor occurrences with metrics + debugf("Unknown type %d for stat entry: %q", dirEntry.directoryType, dirEntry.name) } return nil } -type sharedHeaderBase struct { - epoch int64 - inProgress int64 - directoryOffset int64 - errorOffset int64 - statsOffset int64 -} +func (c *statSegment) updateEntryData(dirEntry *statSegDirectoryEntry, stat *adapter.Stat) error { + switch (*stat).(type) { + case adapter.ScalarStat: + *stat = adapter.ScalarStat(dirEntry.unionData) -type statSegSharedHeader struct { - version uint64 - sharedHeaderBase -} + case adapter.ErrorStat: + _, errOffset, _ := c.getOffsets() + offsetVector := unsafe.Pointer(&c.sharedHeader[errOffset]) -func (c *statSegment) readHeaderOld() (header statSegSharedHeader) { - h := (*sharedHeaderBase)(unsafe.Pointer(&c.sharedHeader[0])) - header.version = 0 - header.epoch = atomic.LoadInt64(&h.epoch) - header.inProgress = atomic.LoadInt64(&h.inProgress) - header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) - header.errorOffset = atomic.LoadInt64(&h.errorOffset) - header.statsOffset = atomic.LoadInt64(&h.statsOffset) - return -} + var errData adapter.Counter + if c.legacyVersion { + // error were not vector (per-worker) in VPP 19.04 + offset := uintptr(dirEntry.unionData) * unsafe.Sizeof(uint64(0)) + val := *(*adapter.Counter)(statSegPointer(offsetVector, offset)) + errData = val + } else { + vecLen := vectorLen(offsetVector) + for i := uint64(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + offset := uintptr(cb) + uintptr(dirEntry.unionData)*unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), offset)) + errData += val + } + } + *stat = adapter.ErrorStat(errData) -func (c *statSegment) readHeader() (header statSegSharedHeader) { - h := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - header.version = atomic.LoadUint64(&h.version) - header.epoch = atomic.LoadInt64(&h.epoch) - header.inProgress = atomic.LoadInt64(&h.inProgress) - header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) - header.errorOffset = atomic.LoadInt64(&h.errorOffset) - header.statsOffset = atomic.LoadInt64(&h.statsOffset) - return -} + case adapter.SimpleCounterStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } -func (c *statSegment) readVersion() uint64 { - if c.oldHeader { - return 0 - } - header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - version := atomic.LoadUint64(&header.version) - return version -} + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) -func (c *statSegment) readEpoch() (int64, bool) { - if c.oldHeader { - h := c.readHeaderOld() - return h.epoch, h.inProgress != 0 - } - header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - epoch := atomic.LoadInt64(&header.epoch) - inprog := atomic.LoadInt64(&header.inProgress) - return epoch, inprog != 0 -} + data := (*stat).(adapter.SimpleCounterStat) + if uint64(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint64(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) + vecLen2 := vectorLen(counterVec) + simpData := data[i] + if uint64(len(simpData)) != vecLen2 { + return ErrStatDataLenIncorrect + } + for j := uint64(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.Counter(0)) + val := *(*adapter.Counter)(statSegPointer(counterVec, offset)) + simpData[j] = val + } + } -func (c *statSegment) readOffsets() (dir, err, stat int64) { - if c.oldHeader { - h := c.readHeaderOld() - return h.directoryOffset, h.errorOffset, h.statsOffset - } - header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - dirOffset := atomic.LoadInt64(&header.directoryOffset) - errOffset := atomic.LoadInt64(&header.errorOffset) - statOffset := atomic.LoadInt64(&header.statsOffset) - return dirOffset, errOffset, statOffset -} + case adapter.CombinedCounterStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } -type statSegAccess struct { - epoch int64 -} + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) -func (c *statSegment) accessStart() *statSegAccess { - epoch, inprog := c.readEpoch() - t := time.Now() - for inprog { - if time.Since(t) > maxWaitInProgress { - return nil + data := (*stat).(adapter.CombinedCounterStat) + if uint64(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint64(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + counterVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) + vecLen2 := vectorLen(counterVec) + combData := data[i] + if uint64(len(combData)) != vecLen2 { + return ErrStatDataLenIncorrect + } + for j := uint64(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(adapter.CombinedCounter{}) + val := *(*adapter.CombinedCounter)(statSegPointer(counterVec, offset)) + combData[j] = val + } } - epoch, inprog = c.readEpoch() - } - return &statSegAccess{ - epoch: epoch, - } -} -func (c *statSegment) accessEnd(acc *statSegAccess) bool { - epoch, inprog := c.readEpoch() - if acc.epoch != epoch || inprog { - return false - } - return true -} + case adapter.NameStat: + if dirEntry.unionData == 0 { + debugf("offset invalid for %s", dirEntry.name) + break + } else if dirEntry.unionData >= uint64(len(c.sharedHeader)) { + debugf("offset out of range for %s", dirEntry.name) + break + } -type vecHeader struct { - length uint64 - vectorData [0]uint8 -} + vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirEntry.unionData])) + offsetVector := statSegPointer(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) -func vectorLen(v unsafe.Pointer) uint64 { - vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uintptr(0)))) - return vec.length -} + data := (*stat).(adapter.NameStat) + if uint64(len(data)) != vecLen { + return ErrStatDataLenIncorrect + } + for i := uint64(0); i < vecLen; i++ { + cb := *(*uint64)(statSegPointer(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) + if cb == 0 { + continue + } + nameVec := unsafe.Pointer(&c.sharedHeader[cb]) + vecLen2 := vectorLen(nameVec) + + nameData := data[i] + if uint64(len(nameData))+1 != vecLen2 { + return ErrStatDataLenIncorrect + } + for j := uint64(0); j < vecLen2; j++ { + offset := uintptr(j) * unsafe.Sizeof(byte(0)) + val := *(*byte)(statSegPointer(nameVec, offset)) + if val == 0 { + break + } + nameData[j] = val + } + } -//go:nosplit -func add(p unsafe.Pointer, x uintptr) unsafe.Pointer { - return unsafe.Pointer(uintptr(p) + x) + default: + if Debug { + Log.Debugf("Unrecognized stat type %T for stat entry: %v", stat, dirEntry.name) + } + } + return nil } diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go index 6381b9f..d4e5a56 100644 --- a/adapter/statsclient/statsclient.go +++ b/adapter/statsclient/statsclient.go @@ -20,7 +20,6 @@ import ( "fmt" "os" "regexp" - "unsafe" logger "github.com/sirupsen/logrus" @@ -63,11 +62,19 @@ func init() { } } +func debugf(f string, a ...interface{}) { + if Debug { + Log.Debugf(f, a...) + } +} + +// implements StatsAPI +var _ adapter.StatsAPI = (*StatsClient)(nil) + // StatsClient is the pure Go implementation for VPP stats API. type StatsClient struct { sockAddr string - currentEpoch int64 statSegment } @@ -94,13 +101,6 @@ func (c *StatsClient) Connect() error { return err } - ver := c.readVersion() - Log.Debugf("stat segment version: %v", ver) - - if err := checkVersion(ver); err != nil { - return err - } - return nil } @@ -108,116 +108,235 @@ func (c *StatsClient) Disconnect() error { if err := c.statSegment.disconnect(); err != nil { return err } - return nil } -func (c *StatsClient) ListStats(patterns ...string) (statNames []string, err error) { +func (c *StatsClient) ListStats(patterns ...string) (names []string, err error) { sa := c.accessStart() - if sa == nil { - return nil, fmt.Errorf("access failed") + if sa.epoch == 0 { + return nil, adapter.ErrStatsAccessFailed } - dirOffset, _, _ := c.readOffsets() - Log.Debugf("dirOffset: %v", dirOffset) + indexes, err := c.listIndexes(patterns...) + if err != nil { + return nil, err + } + for _, index := range indexes { + name, err := c.entryName(index) + if err != nil { + return nil, err + } + names = append(names, name) + } - vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirOffset])) - Log.Debugf("vecLen: %v", vecLen) - Log.Debugf("unsafe.Sizeof(statSegDirectoryEntry{}): %v", unsafe.Sizeof(statSegDirectoryEntry{})) + if !c.accessEnd(&sa) { + return nil, adapter.ErrStatsDataBusy + } - for i := uint64(0); i < vecLen; i++ { - offset := uintptr(i) * unsafe.Sizeof(statSegDirectoryEntry{}) - dirEntry := (*statSegDirectoryEntry)(add(unsafe.Pointer(&c.sharedHeader[dirOffset]), offset)) + return names, nil +} - nul := bytes.IndexByte(dirEntry.name[:], '\x00') - if nul < 0 { - Log.Debugf("no zero byte found for: %q", dirEntry.name[:]) - continue - } - name := string(dirEntry.name[:nul]) - if name == "" { - Log.Debugf("entry with empty name found (%d)", i) - continue - } +func (c *StatsClient) DumpStats(patterns ...string) (entries []adapter.StatEntry, err error) { + sa := c.accessStart() + if sa.epoch == 0 { + return nil, adapter.ErrStatsAccessFailed + } - Log.Debugf(" %80q (type: %v, data: %d, offset: %d) ", name, dirEntry.directoryType, dirEntry.unionData, dirEntry.offsetVector) + dir, err := c.listIndexes(patterns...) + if err != nil { + return nil, err + } + if entries, err = c.dumpEntries(dir); err != nil { + return nil, err + } - if nameMatches(name, patterns) { - statNames = append(statNames, name) - } + if !c.accessEnd(&sa) { + return nil, adapter.ErrStatsDataBusy + } + + return entries, nil +} + +func (c *StatsClient) PrepareDir(patterns ...string) (*adapter.StatDir, error) { + dir := new(adapter.StatDir) - // TODO: copy the listed entries elsewhere + sa := c.accessStart() + if sa.epoch == 0 { + return nil, adapter.ErrStatsAccessFailed } - if !c.accessEnd(sa) { - return nil, adapter.ErrStatDirBusy + indexes, err := c.listIndexes(patterns...) + if err != nil { + return nil, err } + dir.Indexes = indexes - c.currentEpoch = sa.epoch + entries, err := c.dumpEntries(indexes) + if err != nil { + return nil, err + } + dir.Entries = entries - return statNames, nil + if !c.accessEnd(&sa) { + return nil, adapter.ErrStatsDataBusy + } + dir.Epoch = sa.epoch + + return dir, nil } -func (c *StatsClient) DumpStats(patterns ...string) (entries []*adapter.StatEntry, err error) { - epoch, _ := c.readEpoch() - if c.currentEpoch > 0 && c.currentEpoch != epoch { // TODO: do list stats before dump - return nil, fmt.Errorf("old data") +func (c *StatsClient) UpdateDir(dir *adapter.StatDir) (err error) { + epoch, _ := c.getEpoch() + if dir.Epoch != epoch { + return adapter.ErrStatsDirStale } sa := c.accessStart() - if sa == nil { - return nil, fmt.Errorf("access failed") + if sa.epoch == 0 { + return adapter.ErrStatsAccessFailed } - dirOffset, _, _ := c.readOffsets() - vecLen := vectorLen(unsafe.Pointer(&c.sharedHeader[dirOffset])) + dirVector := c.getStatDirVector() - for i := uint64(0); i < vecLen; i++ { - offset := uintptr(i) * unsafe.Sizeof(statSegDirectoryEntry{}) - dirEntry := (*statSegDirectoryEntry)(add(unsafe.Pointer(&c.sharedHeader[dirOffset]), offset)) + for i, index := range dir.Indexes { + dirEntry := c.getStatDirIndex(dirVector, index) - nul := bytes.IndexByte(dirEntry.name[:], '\x00') - if nul < 0 { - Log.Debugf("no zero byte found for: %q", dirEntry.name[:]) + var name []byte + for n := 0; n < len(dirEntry.name); n++ { + if dirEntry.name[n] == 0 { + name = dirEntry.name[:n] + break + } + } + if len(name) == 0 { + continue + } + + entry := &dir.Entries[i] + if !bytes.Equal(name, entry.Name) { + continue + } + if adapter.StatType(dirEntry.directoryType) != entry.Type { continue } - name := string(dirEntry.name[:nul]) - if name == "" { - Log.Debugf("entry with empty name found (%d)", i) + if entry.Data == nil { continue } + if err := c.updateEntryData(dirEntry, &entry.Data); err != nil { + return fmt.Errorf("updating stat data for entry %s failed: %v", name, err) + } - Log.Debugf(" - %s (type: %v, data: %v, offset: %v) ", name, dirEntry.directoryType, dirEntry.unionData, dirEntry.offsetVector) + } - entry := adapter.StatEntry{ - Name: name, - Type: adapter.StatType(dirEntry.directoryType), - Data: c.copyData(dirEntry), - } + if !c.accessEnd(&sa) { + return adapter.ErrStatsDataBusy + } - Log.Debugf("\tentry data: %+v %#v (%T)", entry.Data, entry.Data, entry.Data) + return nil +} - if nameMatches(entry.Name, patterns) { - entries = append(entries, &entry) +// listIndexes lists indexes for all stat entries that match any of the regex patterns. +func (c *StatsClient) listIndexes(patterns ...string) (indexes []uint32, err error) { + if len(patterns) == 0 { + return c.listIndexesFunc(nil) + } + var regexes = make([]*regexp.Regexp, len(patterns)) + for i, pattern := range patterns { + r, err := regexp.Compile(pattern) + if err != nil { + return nil, fmt.Errorf("compiling regexp failed: %v", err) + } + regexes[i] = r + } + nameMatches := func(name []byte) bool { + for _, r := range regexes { + if r.Match(name) { + return true + } } + return false } + return c.listIndexesFunc(nameMatches) +} - if !c.accessEnd(sa) { - return nil, adapter.ErrStatDumpBusy +func (c *StatsClient) listIndexesFunc(f func(name []byte) bool) (indexes []uint32, err error) { + if f == nil { + // there is around ~3150 stats, so to avoid too many allocations + // we set capacity to 3200 when listing all stats + indexes = make([]uint32, 0, 3200) } - return entries, nil + dirVector := c.getStatDirVector() + vecLen := uint32(vectorLen(dirVector)) + + for i := uint32(0); i < vecLen; i++ { + dirEntry := c.getStatDirIndex(dirVector, i) + + if f != nil { + var name []byte + for n := 0; n < len(dirEntry.name); n++ { + if dirEntry.name[n] == 0 { + name = dirEntry.name[:n] + break + } + } + if len(name) == 0 || !f(name) { + continue + } + } + indexes = append(indexes, i) + } + + return indexes, nil } -func nameMatches(name string, patterns []string) bool { - if len(patterns) == 0 { - return true +func (c *StatsClient) entryName(index uint32) (string, error) { + dirVector := c.getStatDirVector() + vecLen := uint32(vectorLen(dirVector)) + + if index >= vecLen { + return "", fmt.Errorf("stat entry index %d out of range (%d)", index, vecLen) } - for _, pattern := range patterns { - matched, err := regexp.MatchString(pattern, name) - if err == nil && matched { - return true + + dirEntry := c.getStatDirIndex(dirVector, index) + + var name []byte + for n := 0; n < len(dirEntry.name); n++ { + if dirEntry.name[n] == 0 { + name = dirEntry.name[:n] + break } } - return false + + return string(name), nil +} + +func (c *StatsClient) dumpEntries(indexes []uint32) (entries []adapter.StatEntry, err error) { + entries = make([]adapter.StatEntry, 0, len(indexes)) + + dirVector := c.getStatDirVector() + + for _, index := range indexes { + dirEntry := c.getStatDirIndex(dirVector, index) + + var name []byte + for n := 0; n < len(dirEntry.name); n++ { + if dirEntry.name[n] == 0 { + name = dirEntry.name[:n] + break + } + } + if len(name) == 0 { + continue + } + + entry := adapter.StatEntry{ + Name: append([]byte(nil), name...), + Type: adapter.StatType(dirEntry.directoryType), + Data: c.copyEntryData(dirEntry), + } + entries = append(entries, entry) + } + + return entries, nil } diff --git a/adapter/statsclient/statseg.go b/adapter/statsclient/statseg.go new file mode 100644 index 0000000..7f1c381 --- /dev/null +++ b/adapter/statsclient/statseg.go @@ -0,0 +1,100 @@ +package statsclient + +import ( + "sync/atomic" + "time" + "unsafe" +) + +var ( + MaxWaitInProgress = time.Millisecond * 100 + CheckDelayInProgress = time.Microsecond * 10 +) + +type sharedHeaderBase struct { + epoch int64 + inProgress int64 + directoryOffset int64 + errorOffset int64 + statsOffset int64 +} + +type statSegSharedHeader struct { + version uint64 + sharedHeaderBase +} + +func (h *statSegSharedHeader) legacyVersion() bool { + // older VPP (<=19.04) did not have version in stat segment header + // we try to provide fallback support by skipping it in header + if h.version > maxVersion && h.inProgress > 1 && h.epoch == 0 { + return true + } + return false +} + +func statSegHeader(b []byte) (header statSegSharedHeader) { + h := (*statSegSharedHeader)(unsafe.Pointer(&b[0])) + header.version = atomic.LoadUint64(&h.version) + header.epoch = atomic.LoadInt64(&h.epoch) + header.inProgress = atomic.LoadInt64(&h.inProgress) + header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) + header.errorOffset = atomic.LoadInt64(&h.errorOffset) + header.statsOffset = atomic.LoadInt64(&h.statsOffset) + return +} + +func statSegHeaderLegacy(b []byte) (header statSegSharedHeader) { + h := (*sharedHeaderBase)(unsafe.Pointer(&b[0])) + header.version = 0 + header.epoch = atomic.LoadInt64(&h.epoch) + header.inProgress = atomic.LoadInt64(&h.inProgress) + header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) + header.errorOffset = atomic.LoadInt64(&h.errorOffset) + header.statsOffset = atomic.LoadInt64(&h.statsOffset) + return +} + +type statSegAccess struct { + epoch int64 +} + +func (c *statSegment) accessStart() statSegAccess { + t := time.Now() + + epoch, inprog := c.getEpoch() + for inprog { + if time.Since(t) > MaxWaitInProgress { + return statSegAccess{} + } else { + time.Sleep(CheckDelayInProgress) + } + epoch, inprog = c.getEpoch() + } + return statSegAccess{ + epoch: epoch, + } +} + +func (c *statSegment) accessEnd(acc *statSegAccess) bool { + epoch, inprog := c.getEpoch() + if acc.epoch != epoch || inprog { + return false + } + return true +} + +type vecHeader struct { + length uint64 + vectorData [0]uint8 +} + +func vectorLen(v unsafe.Pointer) uint64 { + vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uintptr(0)))) + return vec.length +} + +//go:nosplit +func statSegPointer(p unsafe.Pointer, offset uintptr) unsafe.Pointer { + return unsafe.Pointer(uintptr(p) + offset) +} diff --git a/adapter/statsclient/version.go b/adapter/statsclient/version.go deleted file mode 100644 index a289faa..0000000 --- a/adapter/statsclient/version.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) 2019 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package statsclient - -import ( - "fmt" -) - -const ( - MinVersion = 0 - MaxVersion = 1 -) - -func checkVersion(ver uint64) error { - if ver < MinVersion { - return fmt.Errorf("stat segment version is too old: %v (minimal version: %v)", ver, MinVersion) - } else if ver > MaxVersion { - return fmt.Errorf("stat segment version is not supported: %v (minimal version: %v)", ver, MaxVersion) - } - return nil -} |