diff options
Diffstat (limited to 'adapter/statsclient')
-rw-r--r-- | adapter/statsclient/stat_segment.go | 246 | ||||
-rw-r--r-- | adapter/statsclient/statsclient.go | 231 | ||||
-rw-r--r-- | adapter/statsclient/version.go | 33 |
3 files changed, 323 insertions, 187 deletions
diff --git a/adapter/statsclient/stat_segment.go b/adapter/statsclient/stat_segment.go new file mode 100644 index 0000000..83afa10 --- /dev/null +++ b/adapter/statsclient/stat_segment.go @@ -0,0 +1,246 @@ +// Copyright (c) 2019 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsclient + +import ( + "fmt" + "net" + "sync/atomic" + "syscall" + "time" + "unsafe" + + "github.com/ftrvxmtrx/fd" + + "git.fd.io/govpp.git/adapter" +) + +var ( + maxWaitInProgress = time.Second * 1 +) + +type statSegDirectoryEntry struct { + directoryType statDirectoryType + // unionData can represent: offset, index or value + unionData uint64 + offsetVector uint64 + name [128]byte +} + +type statDirectoryType int32 + +func (t statDirectoryType) String() string { + return adapter.StatType(t).String() +} + +type statSegment struct { + sharedHeader []byte + memorySize int64 + + oldHeader bool +} + +func (c *statSegment) connect(sockName string) error { + addr := &net.UnixAddr{ + Net: "unixpacket", + Name: sockName, + } + + Log.Debugf("connecting to: %v", addr) + + conn, err := net.DialUnix(addr.Net, nil, addr) + if err != nil { + Log.Warnf("connecting to socket %s failed: %s", addr, err) + return err + } + defer func() { + if err := conn.Close(); err != nil { + Log.Warnf("closing socket failed: %v", err) + } + }() + + Log.Debugf("connected to socket") + + files, err := fd.Get(conn, 1, nil) + if err != nil { + return fmt.Errorf("getting file descriptor over socket failed: %v", err) + } + if len(files) == 0 { + return fmt.Errorf("no files received over socket") + } + defer func() { + for _, f := range files { + if err := f.Close(); err != nil { + Log.Warnf("closing file %s failed: %v", f.Name(), err) + } + } + }() + + Log.Debugf("received %d files over socket", len(files)) + + f := files[0] + + info, err := f.Stat() + if err != nil { + return err + } + + size := info.Size() + + Log.Debugf("fd: name=%v size=%v", info.Name(), size) + + data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) + if err != nil { + Log.Warnf("mapping shared memory failed: %v", err) + return fmt.Errorf("mapping shared memory failed: %v", err) + } + + Log.Debugf("successfuly mapped shared memory") + + c.sharedHeader = data + c.memorySize = size + + header := c.readHeader() + Log.Debugf("stat segment header: %+v", header) + + // older VPP (19.04) did not have version in stat segment header + // we try to provide fallback support by skipping it in header + if header.version > MaxVersion && header.inProgress > 1 && header.epoch == 0 { + h := c.readHeaderOld() + Log.Warnf("falling back to old stat segment header version: %+v", h) + c.oldHeader = true + } + + return nil +} + +func (c *statSegment) disconnect() error { + if err := syscall.Munmap(c.sharedHeader); err != nil { + Log.Warnf("unmapping shared memory failed: %v", err) + return fmt.Errorf("unmapping shared memory failed: %v", err) + } + + Log.Debugf("successfuly unmapped shared memory") + + return nil +} + +type sharedHeaderBase struct { + epoch int64 + inProgress int64 + directoryOffset int64 + errorOffset int64 + statsOffset int64 +} + +type statSegSharedHeader struct { + version uint64 + sharedHeaderBase +} + +func (c *statSegment) readHeaderOld() (header statSegSharedHeader) { + h := (*sharedHeaderBase)(unsafe.Pointer(&c.sharedHeader[0])) + header.version = 0 + header.epoch = atomic.LoadInt64(&h.epoch) + header.inProgress = atomic.LoadInt64(&h.inProgress) + header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) + header.errorOffset = atomic.LoadInt64(&h.errorOffset) + header.statsOffset = atomic.LoadInt64(&h.statsOffset) + return +} + +func (c *statSegment) readHeader() (header statSegSharedHeader) { + h := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) + header.version = atomic.LoadUint64(&h.version) + header.epoch = atomic.LoadInt64(&h.epoch) + header.inProgress = atomic.LoadInt64(&h.inProgress) + header.directoryOffset = atomic.LoadInt64(&h.directoryOffset) + header.errorOffset = atomic.LoadInt64(&h.errorOffset) + header.statsOffset = atomic.LoadInt64(&h.statsOffset) + return +} + +func (c *statSegment) readVersion() uint64 { + if c.oldHeader { + return 0 + } + header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) + version := atomic.LoadUint64(&header.version) + return version +} + +func (c *statSegment) readEpoch() (int64, bool) { + if c.oldHeader { + h := c.readHeaderOld() + return h.epoch, h.inProgress != 0 + } + header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) + epoch := atomic.LoadInt64(&header.epoch) + inprog := atomic.LoadInt64(&header.inProgress) + return epoch, inprog != 0 +} + +func (c *statSegment) readOffsets() (dir, err, stat int64) { + if c.oldHeader { + h := c.readHeaderOld() + return h.directoryOffset, h.errorOffset, h.statsOffset + } + header := (*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) + dirOffset := atomic.LoadInt64(&header.directoryOffset) + errOffset := atomic.LoadInt64(&header.errorOffset) + statOffset := atomic.LoadInt64(&header.statsOffset) + return dirOffset, errOffset, statOffset +} + +type statSegAccess struct { + epoch int64 +} + +func (c *statSegment) accessStart() *statSegAccess { + epoch, inprog := c.readEpoch() + t := time.Now() + for inprog { + if time.Since(t) > maxWaitInProgress { + return nil + } + epoch, inprog = c.readEpoch() + } + return &statSegAccess{ + epoch: epoch, + } +} + +func (c *statSegment) accessEnd(acc *statSegAccess) bool { + epoch, inprog := c.readEpoch() + if acc.epoch != epoch || inprog { + return false + } + return true +} + +type vecHeader struct { + length uint64 + vectorData [0]uint8 +} + +func vectorLen(v unsafe.Pointer) uint64 { + vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uintptr(0)))) + return vec.length +} + +//go:nosplit +func add(p unsafe.Pointer, x uintptr) unsafe.Pointer { + return unsafe.Pointer(uintptr(p) + x) +} diff --git a/adapter/statsclient/statsclient.go b/adapter/statsclient/statsclient.go index 07fcc49..4022740 100644 --- a/adapter/statsclient/statsclient.go +++ b/adapter/statsclient/statsclient.go @@ -12,20 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package statsclient is pure Go implementation of VPP stats API client. package statsclient import ( "bytes" "fmt" - "net" "os" "regexp" - "sync/atomic" - "syscall" - "time" "unsafe" - "github.com/ftrvxmtrx/fd" logger "github.com/sirupsen/logrus" "git.fd.io/govpp.git/adapter" @@ -52,130 +48,59 @@ func init() { type StatsClient struct { sockAddr string - currentEpoch int64 - sharedHeader []byte - directoryVector uintptr - memorySize int + currentEpoch int64 + statSegment } // NewStatsClient returns new VPP stats API client. -func NewStatsClient(socketName string) *StatsClient { - return &StatsClient{ - sockAddr: socketName, - } -} - -func (c *StatsClient) Connect() error { - var sockName string - if c.sockAddr == "" { - sockName = adapter.DefaultStatsSocket - } else { - sockName = c.sockAddr +func NewStatsClient(sockAddr string) *StatsClient { + if sockAddr == "" { + sockAddr = adapter.DefaultStatsSocket } - - if _, err := os.Stat(sockName); err != nil { - if os.IsNotExist(err) { - return fmt.Errorf("stats socket file %q does not exists, ensure that VPP is running with `statseg { ... }` section in config", sockName) - } - return fmt.Errorf("stats socket file error: %v", err) - } - - if err := c.statSegmentConnect(sockName); err != nil { - return err + return &StatsClient{ + sockAddr: sockAddr, } - - return nil } -const statshmFilename = "statshm" +const sockNotFoundWarn = `VPP stats socket not found at: %s! + Check if VPP is running with stats segment enabled. + To enable it include following section in VPP config: + statseg { + default + } +` -func (c *StatsClient) statSegmentConnect(sockName string) error { - addr := &net.UnixAddr{ - Net: "unixpacket", - Name: sockName, +func (c *StatsClient) Connect() error { + // check if socket exists + if _, err := os.Stat(c.sockAddr); os.IsNotExist(err) { + Log.Warnf(sockNotFoundWarn, c.sockAddr) + return fmt.Errorf("stats socket file %s does not exist", c.sockAddr) + } else if err != nil { + return fmt.Errorf("stats socket error: %v", err) } - Log.Debugf("connecting to: %v", addr) - - conn, err := net.DialUnix(addr.Net, nil, addr) - if err != nil { - Log.Warnf("connecting to socket %s failed: %s", addr, err) + if err := c.statSegment.connect(c.sockAddr); err != nil { return err } - defer func() { - if err := conn.Close(); err != nil { - Log.Warnf("closing socket failed: %v", err) - } - }() - - Log.Debugf("connected to socket: %v", addr) - - files, err := fd.Get(conn, 1, []string{statshmFilename}) - if err != nil { - return fmt.Errorf("getting file descriptor over socket failed: %v", err) - } else if len(files) == 0 { - return fmt.Errorf("no files received over socket") - } - defer func() { - for _, f := range files { - if err := f.Close(); err != nil { - Log.Warnf("closing file %s failed: %v", f.Name(), err) - } - } - }() - - Log.Debugf("received %d files over socket", len(files)) - f := files[0] + ver := c.readVersion() + Log.Debugf("stat segment version: %v", ver) - info, err := f.Stat() - if err != nil { + if err := checkVersion(ver); err != nil { return err } - size := int(info.Size()) - - Log.Debugf("fd: name=%v size=%v", info.Name(), size) - - data, err := syscall.Mmap(int(f.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED) - if err != nil { - Log.Warnf("mapping shared memory failed: %v", err) - return fmt.Errorf("mapping shared memory failed: %v", err) - } - - Log.Debugf("successfuly mapped shared memory") - - c.sharedHeader = data - c.memorySize = size - return nil } func (c *StatsClient) Disconnect() error { - err := syscall.Munmap(c.sharedHeader) - if err != nil { - Log.Warnf("unmapping shared memory failed: %v", err) - return fmt.Errorf("unmapping shared memory failed: %v", err) + if err := c.statSegment.disconnect(); err != nil { + return err } - Log.Debugf("successfuly unmapped shared memory") - return nil } -func nameMatches(name string, patterns []string) bool { - if len(patterns) == 0 { - return true - } - for _, pattern := range patterns { - matched, err := regexp.MatchString(pattern, name) - if err == nil && matched { - return true - } - } - return false -} - func (c *StatsClient) ListStats(patterns ...string) (statNames []string, err error) { sa := c.accessStart() if sa == nil { @@ -344,11 +269,17 @@ func (c *StatsClient) copyData(dirEntry *statSegDirectoryEntry) (statEntry adapt nameVector := unsafe.Pointer(&c.sharedHeader[dirEntry.unionData]) // offset vecLen := vectorLen(nameVector) offsetVector := add(unsafe.Pointer(&c.sharedHeader[0]), uintptr(dirEntry.offsetVector)) + fmt.Printf("vecLen: %v\n", vecLen) data := make([]adapter.Name, vecLen) for i := uint64(0); i < vecLen; i++ { cb := *(*uint64)(add(offsetVector, uintptr(i)*unsafe.Sizeof(uint64(0)))) - nameVec := unsafe.Pointer(&c.sharedHeader[uintptr(cb)]) + if cb == 0 { + Log.Debugf("\tname vector cb out of range") + continue + } + nameVec := unsafe.Pointer(&c.sharedHeader[cb]) + fmt.Printf("offsetVector: %v, cb: %v\n", offsetVector, cb) vecLen2 := vectorLen(nameVec) var nameStr []byte @@ -372,89 +303,15 @@ func (c *StatsClient) copyData(dirEntry *statSegDirectoryEntry) (statEntry adapt return statEntry } -type statDirectoryType int32 - -func (t statDirectoryType) String() string { - return adapter.StatType(t).String() -} - -type statSegDirectoryEntry struct { - directoryType statDirectoryType - // unionData can represent: offset, index or value - unionData uint64 - offsetVector uint64 - name [128]byte -} - -type statSegSharedHeader struct { - version uint64 - epoch int64 - inProgress int64 - directoryOffset int64 - errorOffset int64 - statsOffset int64 -} - -func (c *StatsClient) readVersion() uint64 { - header := *(*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - version := atomic.LoadUint64(&header.version) - return version -} - -func (c *StatsClient) readEpoch() (int64, bool) { - header := *(*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - epoch := atomic.LoadInt64(&header.epoch) - inprog := atomic.LoadInt64(&header.inProgress) - return epoch, inprog != 0 -} - -func (c *StatsClient) readOffsets() (dir, err, stat int64) { - header := *(*statSegSharedHeader)(unsafe.Pointer(&c.sharedHeader[0])) - dirOffset := atomic.LoadInt64(&header.directoryOffset) - errOffset := atomic.LoadInt64(&header.errorOffset) - statOffset := atomic.LoadInt64(&header.statsOffset) - return dirOffset, errOffset, statOffset -} - -type statSegAccess struct { - epoch int64 -} - -var maxWaitInProgress = 1 * time.Second - -func (c *StatsClient) accessStart() *statSegAccess { - epoch, inprog := c.readEpoch() - t := time.Now() - for inprog { - if time.Since(t) > maxWaitInProgress { - return nil - } - epoch, inprog = c.readEpoch() - } - return &statSegAccess{ - epoch: epoch, +func nameMatches(name string, patterns []string) bool { + if len(patterns) == 0 { + return true } -} - -func (c *StatsClient) accessEnd(acc *statSegAccess) bool { - epoch, inprog := c.readEpoch() - if acc.epoch != epoch || inprog { - return false + for _, pattern := range patterns { + matched, err := regexp.MatchString(pattern, name) + if err == nil && matched { + return true + } } - return true -} - -type vecHeader struct { - length uint64 - vectorData [0]uint8 -} - -func vectorLen(v unsafe.Pointer) uint64 { - vec := *(*vecHeader)(unsafe.Pointer(uintptr(v) - unsafe.Sizeof(uintptr(0)))) - return vec.length -} - -//go:nosplit -func add(p unsafe.Pointer, x uintptr) unsafe.Pointer { - return unsafe.Pointer(uintptr(p) + x) + return false } diff --git a/adapter/statsclient/version.go b/adapter/statsclient/version.go new file mode 100644 index 0000000..a289faa --- /dev/null +++ b/adapter/statsclient/version.go @@ -0,0 +1,33 @@ +// Copyright (c) 2019 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsclient + +import ( + "fmt" +) + +const ( + MinVersion = 0 + MaxVersion = 1 +) + +func checkVersion(ver uint64) error { + if ver < MinVersion { + return fmt.Errorf("stat segment version is too old: %v (minimal version: %v)", ver, MinVersion) + } else if ver > MaxVersion { + return fmt.Errorf("stat segment version is not supported: %v (minimal version: %v)", ver, MaxVersion) + } + return nil +} |