aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--adapter/socketclient/socketclient.go146
-rw-r--r--adapter/vppapiclient/vppapiclient.go24
-rw-r--r--core/channel.go14
-rw-r--r--core/connection.go8
-rw-r--r--core/request_handler.go6
5 files changed, 109 insertions, 89 deletions
diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go
index 2e6d0af..96f23e6 100644
--- a/adapter/socketclient/socketclient.go
+++ b/adapter/socketclient/socketclient.go
@@ -47,7 +47,7 @@ var (
DefaultDisconnectTimeout = time.Millisecond * 100
// MaxWaitReady defines maximum duration before waiting for socket file
// times out
- MaxWaitReady = time.Second * 15
+ MaxWaitReady = time.Second * 10
// ClientName is used for identifying client in socket registration
ClientName = "govppsock"
)
@@ -73,9 +73,10 @@ func init() {
type vppClient struct {
sockAddr string
- conn *net.UnixConn
- reader *bufio.Reader
- writer *bufio.Writer
+
+ conn *net.UnixConn
+ reader *bufio.Reader
+ writer *bufio.Writer
connectTimeout time.Duration
disconnectTimeout time.Duration
@@ -116,42 +117,43 @@ func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
// WaitReady checks socket file existence and waits for it if necessary
func (c *vppClient) WaitReady() error {
- // check if file at the path already exists
+ // check if socket already exists
if _, err := os.Stat(c.sockAddr); err == nil {
- return nil
- } else if os.IsExist(err) {
- return err
+ return nil // socket exists, we are ready
+ } else if !os.IsNotExist(err) {
+ return err // some other error occurred
}
- // if not, watch for it
+ // socket does not exist, watch for it
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer func() {
if err := watcher.Close(); err != nil {
- Log.Errorf("failed to close file watcher: %v", err)
+ Log.Warnf("failed to close file watcher: %v", err)
}
}()
- // start watching directory
+ // start directory watcher
if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
return err
}
+ timeout := time.NewTimer(MaxWaitReady)
for {
select {
- case <-time.After(MaxWaitReady):
- return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
+ case <-timeout.C:
+ return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
+
case e := <-watcher.Errors:
return e
+
case ev := <-watcher.Events:
Log.Debugf("watcher event: %+v", ev)
- if ev.Name == c.sockAddr {
- if (ev.Op & fsnotify.Create) == fsnotify.Create {
- // socket was created, we are ready
- return nil
- }
+ if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
+ // socket created, we are ready
+ return nil
}
}
}
@@ -170,6 +172,7 @@ func (c *vppClient) Connect() error {
}
if err := c.open(); err != nil {
+ c.disconnect()
return err
}
@@ -180,6 +183,32 @@ func (c *vppClient) Connect() error {
return nil
}
+func (c *vppClient) Disconnect() error {
+ if c.conn == nil {
+ return nil
+ }
+ Log.Debugf("Disconnecting..")
+
+ close(c.quit)
+
+ if err := c.conn.CloseRead(); err != nil {
+ Log.Debugf("closing read failed: %v", err)
+ }
+
+ // wait for readerLoop to return
+ c.wg.Wait()
+
+ if err := c.close(); err != nil {
+ Log.Debugf("closing failed: %v", err)
+ }
+
+ if err := c.disconnect(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func (c *vppClient) connect(sockAddr string) error {
addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
@@ -188,7 +217,7 @@ func (c *vppClient) connect(sockAddr string) error {
// we try different type of socket for backwards compatbility with VPP<=19.04
if strings.Contains(err.Error(), "wrong type for socket") {
addr.Net = "unixpacket"
- Log.Debugf("%s, retrying connect with type unixpacket", err)
+ Log.Warnf("%s, retrying connect with type unixpacket", err)
conn, err = net.DialUnix("unixpacket", nil, addr)
}
if err != nil {
@@ -198,11 +227,20 @@ func (c *vppClient) connect(sockAddr string) error {
}
c.conn = conn
+ Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
+
c.reader = bufio.NewReader(c.conn)
c.writer = bufio.NewWriter(c.conn)
- Log.Debugf("Connected to socket: %v", addr)
+ return nil
+}
+func (c *vppClient) disconnect() error {
+ Log.Debugf("Closing socket")
+ if err := c.conn.Close(); err != nil {
+ Log.Debugln("Closing socket failed:", err)
+ return err
+ }
return nil
}
@@ -270,34 +308,6 @@ func (c *vppClient) open() error {
return nil
}
-func (c *vppClient) Disconnect() error {
- if c.conn == nil {
- return nil
- }
- Log.Debugf("Disconnecting..")
-
- close(c.quit)
-
- // force readerLoop to timeout
- if err := c.conn.SetReadDeadline(time.Now()); err != nil {
- return err
- }
-
- // wait for readerLoop to return
- c.wg.Wait()
-
- if err := c.close(); err != nil {
- return err
- }
-
- if err := c.conn.Close(); err != nil {
- Log.Debugln("Closing socket failed:", err)
- return err
- }
-
- return nil
-}
-
func (c *vppClient) close() error {
msgCodec := new(codec.MsgCodec)
@@ -373,7 +383,7 @@ func (c *vppClient) SendMsg(context uint32, data []byte) error {
}
copy(data[2:], buf.Bytes())
- Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
+ Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
if err := c.write(data); err != nil {
Log.Debugln("write error: ", err)
@@ -403,27 +413,25 @@ func (c *vppClient) write(msg []byte) error {
Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
}
- if err := c.writer.Flush(); err != nil {
- return err
- }
-
- for i := 0; i <= len(msg)/c.writer.Size(); i++ {
- x := i*c.writer.Size() + c.writer.Size()
+ writerSize := c.writer.Size()
+ for i := 0; i <= len(msg)/writerSize; i++ {
+ x := i*writerSize + writerSize
if x > len(msg) {
x = len(msg)
}
- Log.Debugf("x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/c.writer.Size())
- if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
+ Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
+ if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil {
return err
} else {
Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
}
- if err := c.writer.Flush(); err != nil {
- return err
- }
-
+ }
+ if err := c.writer.Flush(); err != nil {
+ return err
}
+ Log.Debugf(" -- write done")
+
return nil
}
@@ -435,6 +443,7 @@ type msgHeader struct {
func (c *vppClient) readerLoop() {
defer c.wg.Done()
defer Log.Debugf("reader quit")
+
for {
select {
case <-c.quit:
@@ -450,6 +459,7 @@ func (c *vppClient) readerLoop() {
Log.Debugf("read failed: %v", err)
continue
}
+
h := new(msgHeader)
if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
Log.Debugf("unpacking header failed: %v", err)
@@ -468,22 +478,22 @@ type msgheader struct {
}
func (c *vppClient) read() ([]byte, error) {
- Log.Debug("reading next msg..")
+ Log.Debug(" reading next msg..")
header := make([]byte, 16)
n, err := io.ReadAtLeast(c.reader, header, 16)
if err != nil {
return nil, err
- } else if n == 0 {
+ }
+ if n == 0 {
Log.Debugln("zero bytes header")
return nil, nil
- }
- if n != 16 {
+ } else if n != 16 {
Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
}
- Log.Debugf(" - read header %d bytes: % 0X", n, header)
+ Log.Debugf(" read header %d bytes: % 0X", n, header)
h := &msgheader{}
if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
@@ -498,7 +508,7 @@ func (c *vppClient) read() ([]byte, error) {
if err != nil {
return nil, err
}
- Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
+ Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n])
if msgLen > n {
remain := msgLen - n
@@ -520,6 +530,8 @@ func (c *vppClient) read() ([]byte, error) {
}
}
+ Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered())
+
return msg, nil
}
diff --git a/adapter/vppapiclient/vppapiclient.go b/adapter/vppapiclient/vppapiclient.go
index d8465d7..244be43 100644
--- a/adapter/vppapiclient/vppapiclient.go
+++ b/adapter/vppapiclient/vppapiclient.go
@@ -40,7 +40,7 @@ import (
var (
// MaxWaitReady defines maximum duration before waiting for shared memory
// segment times out
- MaxWaitReady = time.Second * 15
+ MaxWaitReady = time.Second * 10
)
const (
@@ -148,12 +148,11 @@ func (a *vppClient) WaitReady() error {
path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile)
}
- // check if file at the path already exists
+ // check if file already exists
if _, err := os.Stat(path); err == nil {
- // file exists, we are ready
- return nil
+ return nil // file exists, we are ready
} else if !os.IsNotExist(err) {
- return err
+ return err // some other error occurred
}
// file does not exist, start watching folder
@@ -168,18 +167,19 @@ func (a *vppClient) WaitReady() error {
return err
}
+ timeout := time.NewTimer(MaxWaitReady)
for {
select {
- case <-time.After(MaxWaitReady):
- return fmt.Errorf("waiting for shared memory segment timed out (%s)", MaxWaitReady)
+ case <-timeout.C:
+ return fmt.Errorf("timeout waiting (%s) for shm file: %s", MaxWaitReady, path)
+
case e := <-watcher.Errors:
return e
+
case ev := <-watcher.Events:
- if ev.Name == path {
- if (ev.Op & fsnotify.Create) == fsnotify.Create {
- // file was created, we are ready
- return nil
- }
+ if ev.Name == path && (ev.Op&fsnotify.Create) == fsnotify.Create {
+ // file created, we are ready
+ return nil
}
}
}
diff --git a/core/channel.go b/core/channel.go
index 2e73917..c464708 100644
--- a/core/channel.go
+++ b/core/channel.go
@@ -21,8 +21,9 @@ import (
"strings"
"time"
- "git.fd.io/govpp.git/api"
"github.com/sirupsen/logrus"
+
+ "git.fd.io/govpp.git/api"
)
var (
@@ -144,6 +145,7 @@ func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
for _, msg := range msgs {
+ // TODO: collect all incompatible messages and return summarized error
_, err := ch.msgIdentifier.GetMessageID(msg)
if err != nil {
return err
@@ -255,17 +257,23 @@ func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last
case vppReply := <-ch.replyChan:
ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
if ignore {
- logrus.Warnf("ignoring reply: %+v", vppReply)
+ logrus.WithFields(logrus.Fields{
+ "expSeqNum": expSeqNum,
+ "channel": ch.id,
+ }).Warnf("ignoring received reply: %+v (expecting: %s)", vppReply, msg.GetMessageName())
continue
}
return lastReplyReceived, err
case <-timer.C:
+ logrus.WithFields(logrus.Fields{
+ "expSeqNum": expSeqNum,
+ "channel": ch.id,
+ }).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName())
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
return false, err
}
}
- return
}
func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
diff --git a/core/connection.go b/core/connection.go
index 02f980a..605e1ef 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -30,8 +30,8 @@ import (
)
const (
- DefaultReconnectInterval = time.Second // default interval between reconnect attempts
- DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
+ DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts
+ DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
)
var (
@@ -255,7 +255,7 @@ func (c *Connection) releaseAPIChannel(ch *Channel) {
// connectLoop attempts to connect to VPP until it succeeds.
// Then it continues with healthCheckLoop.
func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
- reconnectAttempts := 0
+ var reconnectAttempts int
// loop until connected
for {
@@ -268,7 +268,7 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
break
} else if reconnectAttempts < c.maxAttempts {
reconnectAttempts++
- log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
time.Sleep(c.recInterval)
} else {
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
diff --git a/core/request_handler.go b/core/request_handler.go
index fd8aa59..d3f7bdc 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -93,7 +93,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"msg_size": len(data),
"seq_num": req.seqNum,
"msg_crc": req.msg.GetCrcString(),
- }).Debugf("--> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
+ }).Debugf("==> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
}
// send the request to VPP
@@ -118,7 +118,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"msg_id": c.pingReqID,
"msg_size": len(pingData),
"seq_num": req.seqNum,
- }).Debug(" -> sending control ping")
+ }).Debug("--> sending control ping")
if err := c.vppClient.SendMsg(context, pingData); err != nil {
log.WithFields(logger.Fields{
@@ -165,7 +165,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
"is_multi": isMulti,
"seq_num": seqNum,
"msg_crc": msg.GetCrcString(),
- }).Debugf("<-- govpp recv: %s", msg.GetMessageName())
+ }).Debugf("<== govpp recv: %s", msg.GetMessageName())
}
if context == 0 || c.isNotificationMessage(msgID) {