summaryrefslogtreecommitdiffstats
path: root/adapter
diff options
context:
space:
mode:
authorOndrej Fabry <ofabry@cisco.com>2020-06-16 10:40:34 +0200
committerOndrej Fabry <ofabry@cisco.com>2020-06-16 10:40:34 +0200
commit280b1c6c83b676ef4e592f4ecf60cb5b54b6a753 (patch)
treebf9a35f020de061ba66a432411ee44866405fe76 /adapter
parentf049390060630c0085fe4ad683c83a4a14a47ffb (diff)
Optimize socketclient adapter and add various code improvements
This commit includes: Features - optimized [socketclient](adapter/socketclient) adapter and add method to set client name - added list of compatible messages to `CompatibilityError` Fixes - `MsgCodec` will recover panic occurring during a message decoding - calling `Unsubscibe` will close the notification channel Other - improved log messages to provide more relevant info Examples - added more code samples of working with unions in [union example](examples/union-example) - added profiling mode to [perf bench](examples/perf-bench) example - improved [simple client](examples/simple-client) example to work properly even with multiple runs Dependencies - updated `github.com/sirupsen/logrus` dep to `v1.6.0` - updated `github.com/lunixbochs/struc` dep to `v0.0.0-20200521075829-a4cb8d33dbbe` Change-Id: I136a3968ccf9e93760d7ee2b9902fc7e6390a09d Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
Diffstat (limited to 'adapter')
-rw-r--r--adapter/socketclient/doc.go13
-rw-r--r--adapter/socketclient/socketclient.go437
2 files changed, 261 insertions, 189 deletions
diff --git a/adapter/socketclient/doc.go b/adapter/socketclient/doc.go
index 0f93c56..cbb00a2 100644
--- a/adapter/socketclient/doc.go
+++ b/adapter/socketclient/doc.go
@@ -21,13 +21,18 @@
//
// Requirements
//
-// The socketclient will connect to /run/vpp-api.sock by default. However this
-// is not enabled in VPP configuration by default.
+// The socketclient connects to unix domain socket defined in VPP configuration.
//
-// To enable the socket in VPP, add following section to VPP config.
+// It is enabled by default at /run/vpp/api.sock by the following config section:
//
// socksvr {
-// default
+// socket-name default
+// }
+//
+// If you want to use custom socket path:
+//
+// socksvr {
+// socket-name /run/vpp/api.sock
// }
//
package socketclient
diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go
index 366163f..1ee067f 100644
--- a/adapter/socketclient/socketclient.go
+++ b/adapter/socketclient/socketclient.go
@@ -16,7 +16,8 @@ package socketclient
import (
"bufio"
- "bytes"
+ "encoding/binary"
+ "errors"
"fmt"
"io"
"net"
@@ -27,8 +28,7 @@ import (
"time"
"github.com/fsnotify/fsnotify"
- "github.com/lunixbochs/struc"
- logger "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
"git.fd.io/govpp.git/adapter"
"git.fd.io/govpp.git/codec"
@@ -36,38 +36,34 @@ import (
const (
// DefaultSocketName is default VPP API socket file path.
- DefaultSocketName = adapter.DefaultBinapiSocket
- legacySocketName = "/run/vpp-api.sock"
+ DefaultSocketName = "/run/vpp/api.sock"
+ // DefaultClientName is used for identifying client in socket registration
+ DefaultClientName = "govppsock"
)
var (
+
// DefaultConnectTimeout is default timeout for connecting
DefaultConnectTimeout = time.Second * 3
// DefaultDisconnectTimeout is default timeout for discconnecting
DefaultDisconnectTimeout = time.Millisecond * 100
- // MaxWaitReady defines maximum duration before waiting for socket file
- // times out
+ // MaxWaitReady defines maximum duration of waiting for socket file
MaxWaitReady = time.Second * 10
- // ClientName is used for identifying client in socket registration
- ClientName = "govppsock"
)
var (
- // Debug is global variable that determines debug mode
- Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
- // DebugMsgIds is global variable that determines debug mode for msg ids
- DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
+ debug = strings.Contains(os.Getenv("DEBUG_GOVPP"), "socketclient")
+ debugMsgIds = strings.Contains(os.Getenv("DEBUG_GOVPP"), "msgtable")
- // Log is global logger
- Log = logger.New()
+ logger = logrus.New()
+ log = logger.WithField("logger", "govpp/socketclient")
)
-// init initializes global logger, which logs debug level messages to stdout.
+// init initializes global logger
func init() {
- Log.Out = os.Stdout
- if Debug {
- Log.Level = logger.DebugLevel
- Log.Debug("govpp/socketclient: enabled debug mode")
+ if debug {
+ logger.Level = logrus.DebugLevel
+ log.Debug("govpp: debug level enabled for socketclient")
}
}
@@ -88,12 +84,13 @@ const socketMissing = `
var warnOnce sync.Once
-func (c *vppClient) printMissingSocketMsg() {
+func (c *socketClient) printMissingSocketMsg() {
fmt.Fprintf(os.Stderr, socketMissing, c.sockAddr)
}
-type vppClient struct {
- sockAddr string
+type socketClient struct {
+ sockAddr string
+ clientName string
conn *net.UnixConn
reader *bufio.Reader
@@ -102,50 +99,63 @@ type vppClient struct {
connectTimeout time.Duration
disconnectTimeout time.Duration
- cb adapter.MsgCallback
+ msgCallback adapter.MsgCallback
clientIndex uint32
msgTable map[string]uint16
sockDelMsgId uint16
writeMu sync.Mutex
+ headerPool *sync.Pool
+
quit chan struct{}
wg sync.WaitGroup
}
-func NewVppClient(sockAddr string) *vppClient {
+func NewVppClient(sockAddr string) *socketClient {
if sockAddr == "" {
sockAddr = DefaultSocketName
}
- return &vppClient{
+ return &socketClient{
sockAddr: sockAddr,
+ clientName: DefaultClientName,
connectTimeout: DefaultConnectTimeout,
disconnectTimeout: DefaultDisconnectTimeout,
- cb: func(msgID uint16, data []byte) {
- Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
+ headerPool: &sync.Pool{New: func() interface{} {
+ return make([]byte, 16)
+ }},
+ msgCallback: func(msgID uint16, data []byte) {
+ log.Debugf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
},
}
}
+// SetClientName sets a client name used for identification.
+func (c *socketClient) SetClientName(name string) {
+ c.clientName = name
+}
+
// SetConnectTimeout sets timeout used during connecting.
-func (c *vppClient) SetConnectTimeout(t time.Duration) {
+func (c *socketClient) SetConnectTimeout(t time.Duration) {
c.connectTimeout = t
}
// SetDisconnectTimeout sets timeout used during disconnecting.
-func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
+func (c *socketClient) SetDisconnectTimeout(t time.Duration) {
c.disconnectTimeout = t
}
-func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
- Log.Debug("SetMsgCallback")
- c.cb = cb
+func (c *socketClient) SetMsgCallback(cb adapter.MsgCallback) {
+ log.Debug("SetMsgCallback")
+ c.msgCallback = cb
}
-func (c *vppClient) checkLegacySocket() bool {
+const legacySocketName = "/run/vpp-api.sock"
+
+func (c *socketClient) checkLegacySocket() bool {
if c.sockAddr == legacySocketName {
return false
}
- Log.Debugf("checking legacy socket: %s", legacySocketName)
+ log.Debugf("checking legacy socket: %s", legacySocketName)
// check if socket exists
if _, err := os.Stat(c.sockAddr); err == nil {
return false // socket exists
@@ -163,7 +173,7 @@ func (c *vppClient) checkLegacySocket() bool {
}
// WaitReady checks socket file existence and waits for it if necessary
-func (c *vppClient) WaitReady() error {
+func (c *socketClient) WaitReady() error {
// check if socket already exists
if _, err := os.Stat(c.sockAddr); err == nil {
return nil // socket exists, we are ready
@@ -182,7 +192,7 @@ func (c *vppClient) WaitReady() error {
}
defer func() {
if err := watcher.Close(); err != nil {
- Log.Warnf("failed to close file watcher: %v", err)
+ log.Debugf("failed to close file watcher: %v", err)
}
}()
@@ -204,7 +214,7 @@ func (c *vppClient) WaitReady() error {
return e
case ev := <-watcher.Events:
- Log.Debugf("watcher event: %+v", ev)
+ log.Debugf("watcher event: %+v", ev)
if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
// socket created, we are ready
return nil
@@ -213,7 +223,7 @@ func (c *vppClient) WaitReady() error {
}
}
-func (c *vppClient) Connect() error {
+func (c *socketClient) Connect() error {
c.checkLegacySocket()
// check if socket exists
@@ -229,7 +239,7 @@ func (c *vppClient) Connect() error {
}
if err := c.open(); err != nil {
- c.disconnect()
+ _ = c.disconnect()
return err
}
@@ -240,23 +250,23 @@ func (c *vppClient) Connect() error {
return nil
}
-func (c *vppClient) Disconnect() error {
+func (c *socketClient) Disconnect() error {
if c.conn == nil {
return nil
}
- Log.Debugf("Disconnecting..")
+ log.Debugf("Disconnecting..")
close(c.quit)
if err := c.conn.CloseRead(); err != nil {
- Log.Debugf("closing read failed: %v", err)
+ log.Debugf("closing readMsg failed: %v", err)
}
// wait for readerLoop to return
c.wg.Wait()
if err := c.close(); err != nil {
- Log.Debugf("closing failed: %v", err)
+ log.Debugf("closing failed: %v", err)
}
if err := c.disconnect(); err != nil {
@@ -266,38 +276,40 @@ func (c *vppClient) Disconnect() error {
return nil
}
-func (c *vppClient) connect(sockAddr string) error {
+const defaultBufferSize = 4096
+
+func (c *socketClient) connect(sockAddr string) error {
addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
- Log.Debugf("Connecting to: %v", c.sockAddr)
+ log.Debugf("Connecting to: %v", c.sockAddr)
conn, err := net.DialUnix("unix", nil, addr)
if err != nil {
// we try different type of socket for backwards compatbility with VPP<=19.04
if strings.Contains(err.Error(), "wrong type for socket") {
addr.Net = "unixpacket"
- Log.Debugf("%s, retrying connect with type unixpacket", err)
+ log.Debugf("%s, retrying connect with type unixpacket", err)
conn, err = net.DialUnix("unixpacket", nil, addr)
}
if err != nil {
- Log.Debugf("Connecting to socket %s failed: %s", addr, err)
+ log.Debugf("Connecting to socket %s failed: %s", addr, err)
return err
}
}
c.conn = conn
- Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
+ log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
- c.reader = bufio.NewReader(c.conn)
- c.writer = bufio.NewWriter(c.conn)
+ c.reader = bufio.NewReaderSize(c.conn, defaultBufferSize)
+ c.writer = bufio.NewWriterSize(c.conn, defaultBufferSize)
return nil
}
-func (c *vppClient) disconnect() error {
- Log.Debugf("Closing socket")
+func (c *socketClient) disconnect() error {
+ log.Debugf("Closing socket")
if err := c.conn.Close(); err != nil {
- Log.Debugln("Closing socket failed:", err)
+ log.Debugln("Closing socket failed:", err)
return err
}
return nil
@@ -309,44 +321,40 @@ const (
deleteMsgContext = byte(124)
)
-func (c *vppClient) open() error {
- msgCodec := new(codec.MsgCodec)
+func (c *socketClient) open() error {
+ var msgCodec codec.MsgCodec
- req := &SockclntCreate{Name: ClientName}
+ // Request socket client create
+ req := &SockclntCreate{
+ Name: c.clientName,
+ }
msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
if err != nil {
- Log.Debugln("Encode error:", err)
+ log.Debugln("Encode error:", err)
return err
}
// set non-0 context
msg[5] = createMsgContext
- if err := c.write(msg); err != nil {
- Log.Debugln("Write error: ", err)
- return err
- }
-
- readDeadline := time.Now().Add(c.connectTimeout)
- if err := c.conn.SetReadDeadline(readDeadline); err != nil {
+ if err := c.writeMsg(msg); err != nil {
+ log.Debugln("Write error: ", err)
return err
}
- msgReply, err := c.read()
+ msgReply, err := c.readMsgTimeout(nil, c.connectTimeout)
if err != nil {
- Log.Println("Read error:", err)
- return err
- }
- // reset read deadline
- if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
+ log.Println("Read error:", err)
return err
}
reply := new(SockclntCreateReply)
if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
- Log.Println("Decode error:", err)
+ log.Println("Decoding sockclnt_create_reply failed:", err)
return err
+ } else if reply.Response != 0 {
+ return fmt.Errorf("sockclnt_create_reply: response error (%d)", reply.Response)
}
- Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
+ log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
reply.Response, reply.Index, reply.Count)
c.clientIndex = reply.Index
@@ -358,15 +366,15 @@ func (c *vppClient) open() error {
if strings.HasPrefix(name, "sockclnt_delete_") {
c.sockDelMsgId = x.Index
}
- if DebugMsgIds {
- Log.Debugf(" - %4d: %q", x.Index, name)
+ if debugMsgIds {
+ log.Debugf(" - %4d: %q", x.Index, name)
}
}
return nil
}
-func (c *vppClient) close() error {
+func (c *socketClient) close() error {
msgCodec := new(codec.MsgCodec)
req := &SockclntDelete{
@@ -374,133 +382,148 @@ func (c *vppClient) close() error {
}
msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
if err != nil {
- Log.Debugln("Encode error:", err)
+ log.Debugln("Encode error:", err)
return err
}
// set non-0 context
msg[5] = deleteMsgContext
- Log.Debugf("sending socklntDel (%d byes): % 0X", len(msg), msg)
- if err := c.write(msg); err != nil {
- Log.Debugln("Write error: ", err)
- return err
- }
+ log.Debugf("sending socklntDel (%d bytes): % 0X", len(msg), msg)
- readDeadline := time.Now().Add(c.disconnectTimeout)
- if err := c.conn.SetReadDeadline(readDeadline); err != nil {
+ if err := c.writeMsg(msg); err != nil {
+ log.Debugln("Write error: ", err)
return err
}
- msgReply, err := c.read()
+
+ msgReply, err := c.readMsgTimeout(nil, c.disconnectTimeout)
if err != nil {
- Log.Debugln("Read error:", err)
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
// we accept timeout for reply
return nil
}
- return err
- }
- // reset read deadline
- if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
+ log.Debugln("Read error:", err)
return err
}
reply := new(SockclntDeleteReply)
if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
- Log.Debugln("Decode error:", err)
+ log.Debugln("Decoding sockclnt_delete_reply failed:", err)
return err
+ } else if reply.Response != 0 {
+ return fmt.Errorf("sockclnt_delete_reply: response error (%d)", reply.Response)
}
- Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
-
return nil
}
-func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
- msg := msgName + "_" + msgCrc
- msgID, ok := c.msgTable[msg]
- if !ok {
- return 0, &adapter.UnknownMsgError{msgName, msgCrc}
+func (c *socketClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
+ if msgID, ok := c.msgTable[msgName+"_"+msgCrc]; ok {
+ return msgID, nil
+ }
+ return 0, &adapter.UnknownMsgError{
+ MsgName: msgName,
+ MsgCrc: msgCrc,
}
- return msgID, nil
-}
-
-type reqHeader struct {
- // MsgID uint16
- ClientIndex uint32
- Context uint32
}
-func (c *vppClient) SendMsg(context uint32, data []byte) error {
- h := &reqHeader{
- ClientIndex: c.clientIndex,
- Context: context,
- }
- buf := new(bytes.Buffer)
- if err := struc.Pack(buf, h); err != nil {
- return err
+func (c *socketClient) SendMsg(context uint32, data []byte) error {
+ if len(data) < 10 {
+ return fmt.Errorf("invalid message data, length must be at least 10 bytes")
}
- copy(data[2:], buf.Bytes())
+ setMsgRequestHeader(data, c.clientIndex, context)
- Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
+ if debug {
+ log.Debugf("sendMsg (%d) context=%v client=%d: % 02X", len(data), context, c.clientIndex, data)
+ }
- if err := c.write(data); err != nil {
- Log.Debugln("write error: ", err)
+ if err := c.writeMsg(data); err != nil {
+ log.Debugln("writeMsg error: ", err)
return err
}
return nil
}
-func (c *vppClient) write(msg []byte) error {
- h := &msgheader{
- DataLen: uint32(len(msg)),
+// setMsgRequestHeader sets client index and context in the message request header
+//
+// Message request has following structure:
+//
+// type msgRequestHeader struct {
+// MsgID uint16
+// ClientIndex uint32
+// Context uint32
+// }
+//
+func setMsgRequestHeader(data []byte, clientIndex, context uint32) {
+ // message ID is already set
+ binary.BigEndian.PutUint32(data[2:6], clientIndex)
+ binary.BigEndian.PutUint32(data[6:10], context)
+}
+
+func (c *socketClient) writeMsg(msg []byte) error {
+ // we lock to prevent mixing multiple message writes
+ c.writeMu.Lock()
+ defer c.writeMu.Unlock()
+
+ header := c.headerPool.Get().([]byte)
+ err := writeMsgHeader(c.writer, header, len(msg))
+ if err != nil {
+ return err
}
- buf := new(bytes.Buffer)
- if err := struc.Pack(buf, h); err != nil {
+ c.headerPool.Put(header)
+
+ if err := writeMsgData(c.writer, msg, c.writer.Size()); err != nil {
return err
}
- header := buf.Bytes()
- // we lock to prevent mixing multiple message sends
- c.writeMu.Lock()
- defer c.writeMu.Unlock()
+ if err := c.writer.Flush(); err != nil {
+ return err
+ }
+
+ log.Debugf(" -- writeMsg done")
+
+ return nil
+}
+
+func writeMsgHeader(w io.Writer, header []byte, dataLen int) error {
+ binary.BigEndian.PutUint32(header[8:12], uint32(dataLen))
- if n, err := c.writer.Write(header); err != nil {
+ n, err := w.Write(header)
+ if err != nil {
return err
- } else {
- Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
+ }
+ if debug {
+ log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
}
- writerSize := c.writer.Size()
+ return nil
+}
+
+func writeMsgData(w io.Writer, msg []byte, writerSize int) error {
for i := 0; i <= len(msg)/writerSize; i++ {
x := i*writerSize + writerSize
if x > len(msg) {
x = len(msg)
}
- Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
- if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil {
+ if debug {
+ log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
+ }
+ n, err := w.Write(msg[i*writerSize : x])
+ if err != nil {
return err
- } else {
- Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
+ }
+ if debug {
+ log.Debugf(" - data sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
}
}
- if err := c.writer.Flush(); err != nil {
- return err
- }
-
- Log.Debugf(" -- write done")
-
return nil
}
-type msgHeader struct {
- MsgID uint16
- Context uint32
-}
-
-func (c *vppClient) readerLoop() {
+func (c *socketClient) readerLoop() {
defer c.wg.Done()
- defer Log.Debugf("reader quit")
+ defer log.Debugf("reader loop done")
+
+ var buf [8192]byte
for {
select {
@@ -509,72 +532,118 @@ func (c *vppClient) readerLoop() {
default:
}
- msg, err := c.read()
+ msg, err := c.readMsg(buf[:])
if err != nil {
if isClosedError(err) {
return
}
- Log.Debugf("read failed: %v", err)
+ log.Debugf("readMsg error: %v", err)
continue
}
- h := new(msgHeader)
- if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
- Log.Debugf("unpacking header failed: %v", err)
- continue
+ msgID, context := getMsgReplyHeader(msg)
+ if debug {
+ log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), msgID, context)
}
- Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
- c.cb(h.MsgID, msg)
+ c.msgCallback(msgID, msg)
}
}
-type msgheader struct {
- Q int `struc:"uint64"`
- DataLen uint32 `struc:"uint32"`
- GcMarkTimestamp uint32 `struc:"uint32"`
+// getMsgReplyHeader gets message ID and context from the message reply header
+//
+// Message reply has following structure:
+//
+// type msgReplyHeader struct {
+// MsgID uint16
+// Context uint32
+// }
+//
+func getMsgReplyHeader(msg []byte) (msgID uint16, context uint32) {
+ msgID = binary.BigEndian.Uint16(msg[0:2])
+ context = binary.BigEndian.Uint32(msg[2:6])
+ return
}
-func (c *vppClient) read() ([]byte, error) {
- Log.Debug(" reading next msg..")
+func (c *socketClient) readMsgTimeout(buf []byte, timeout time.Duration) ([]byte, error) {
+ // set read deadline
+ readDeadline := time.Now().Add(timeout)
+ if err := c.conn.SetReadDeadline(readDeadline); err != nil {
+ return nil, err
+ }
- header := make([]byte, 16)
+ // read message
+ msgReply, err := c.readMsg(buf)
+ if err != nil {
+ return nil, err
+ }
- n, err := io.ReadAtLeast(c.reader, header, 16)
+ // reset read deadline
+ if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
+ return nil, err
+ }
+
+ return msgReply, nil
+}
+
+func (c *socketClient) readMsg(buf []byte) ([]byte, error) {
+ log.Debug("reading msg..")
+
+ header := c.headerPool.Get().([]byte)
+ msgLen, err := readMsgHeader(c.reader, header)
if err != nil {
return nil, err
}
+ c.headerPool.Put(header)
+
+ msg, err := readMsgData(c.reader, buf, msgLen)
+
+ log.Debugf(" -- readMsg done (buffered: %d)", c.reader.Buffered())
+
+ return msg, nil
+}
+
+func readMsgHeader(r io.Reader, header []byte) (int, error) {
+ n, err := io.ReadAtLeast(r, header, 16)
+ if err != nil {
+ return 0, err
+ }
if n == 0 {
- Log.Debugln("zero bytes header")
- return nil, nil
+ log.Debugln("zero bytes header")
+ return 0, nil
} else if n != 16 {
- Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
- return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
+ log.Debugf("invalid header (%d bytes): % 0X", n, header[:n])
+ return 0, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
}
- Log.Debugf(" read header %d bytes: % 0X", n, header)
- h := &msgheader{}
- if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
- return nil, err
- }
- Log.Debugf(" - decoded header: %+v", h)
+ dataLen := binary.BigEndian.Uint32(header[8:12])
- msgLen := int(h.DataLen)
- msg := make([]byte, msgLen)
+ return int(dataLen), nil
+}
+
+func readMsgData(r io.Reader, buf []byte, dataLen int) ([]byte, error) {
+ var msg []byte
+ if buf == nil || len(buf) < dataLen {
+ msg = make([]byte, dataLen)
+ } else {
+ msg = buf[0:dataLen]
+ }
- n, err = c.reader.Read(msg)
+ n, err := r.Read(msg)
if err != nil {
return nil, err
}
- Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n])
+ if debug {
+ log.Debugf(" - read data (%d bytes): % 0X", n, msg[:n])
+ }
- if msgLen > n {
- remain := msgLen - n
- Log.Debugf("continue read for another %d bytes", remain)
+ if dataLen > n {
+ remain := dataLen - n
+ log.Debugf("continue reading remaining %d bytes", remain)
view := msg[n:]
for remain > 0 {
- nbytes, err := c.reader.Read(view)
+ nbytes, err := r.Read(view)
if err != nil {
return nil, err
} else if nbytes == 0 {
@@ -582,19 +651,17 @@ func (c *vppClient) read() ([]byte, error) {
}
remain -= nbytes
- Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
+ log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
view = view[nbytes:]
}
}
- Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered())
-
return msg, nil
}
func isClosedError(err error) bool {
- if err == io.EOF {
+ if errors.Is(err, io.EOF) {
return true
}
return strings.HasSuffix(err.Error(), "use of closed network connection")