aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOndrej Fabry <ofabry@cisco.com>2019-07-10 07:14:20 +0200
committerOndrej Fabry <ofabry@cisco.com>2019-07-10 07:14:20 +0200
commit4dca07c803308611275f78b490ac0352c1052fe2 (patch)
treeefb52ff663ef3b16f4bd6957379a572c5fd49de0
parentb1006dced4cc0c23d9dc754e97d89500aeb55170 (diff)
Fix socketclient for VPP 19.08
- in VPP 19.08 the socket type has changed to STREAM and data has to be writtento VPP with single flush, otherwise msg might get mixed with next header and cause VPP to stop responding - this also fixes WaitReady for socketclient and vppapiclient Change-Id: I022724c0c09c9b92d4c695d1cf2be15994fff717 Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
-rw-r--r--adapter/socketclient/socketclient.go146
-rw-r--r--adapter/vppapiclient/vppapiclient.go24
-rw-r--r--core/channel.go14
-rw-r--r--core/connection.go8
-rw-r--r--core/request_handler.go6
5 files changed, 109 insertions, 89 deletions
diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go
index 2e6d0af..96f23e6 100644
--- a/adapter/socketclient/socketclient.go
+++ b/adapter/socketclient/socketclient.go
@@ -47,7 +47,7 @@ var (
DefaultDisconnectTimeout = time.Millisecond * 100
// MaxWaitReady defines maximum duration before waiting for socket file
// times out
- MaxWaitReady = time.Second * 15
+ MaxWaitReady = time.Second * 10
// ClientName is used for identifying client in socket registration
ClientName = "govppsock"
)
@@ -73,9 +73,10 @@ func init() {
type vppClient struct {
sockAddr string
- conn *net.UnixConn
- reader *bufio.Reader
- writer *bufio.Writer
+
+ conn *net.UnixConn
+ reader *bufio.Reader
+ writer *bufio.Writer
connectTimeout time.Duration
disconnectTimeout time.Duration
@@ -116,42 +117,43 @@ func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
// WaitReady checks socket file existence and waits for it if necessary
func (c *vppClient) WaitReady() error {
- // check if file at the path already exists
+ // check if socket already exists
if _, err := os.Stat(c.sockAddr); err == nil {
- return nil
- } else if os.IsExist(err) {
- return err
+ return nil // socket exists, we are ready
+ } else if !os.IsNotExist(err) {
+ return err // some other error occurred
}
- // if not, watch for it
+ // socket does not exist, watch for it
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer func() {
if err := watcher.Close(); err != nil {
- Log.Errorf("failed to close file watcher: %v", err)
+ Log.Warnf("failed to close file watcher: %v", err)
}
}()
- // start watching directory
+ // start directory watcher
if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
return err
}
+ timeout := time.NewTimer(MaxWaitReady)
for {
select {
- case <-time.After(MaxWaitReady):
- return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
+ case <-timeout.C:
+ return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
+
case e := <-watcher.Errors:
return e
+
case ev := <-watcher.Events:
Log.Debugf("watcher event: %+v", ev)
- if ev.Name == c.sockAddr {
- if (ev.Op & fsnotify.Create) == fsnotify.Create {
- // socket was created, we are ready
- return nil
- }
+ if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
+ // socket created, we are ready
+ return nil
}
}
}
@@ -170,6 +172,7 @@ func (c *vppClient) Connect() error {
}
if err := c.open(); err != nil {
+ c.disconnect()
return err
}
@@ -180,6 +183,32 @@ func (c *vppClient) Connect() error {
return nil
}
+func (c *vppClient) Disconnect() error {
+ if c.conn == nil {
+ return nil
+ }
+ Log.Debugf("Disconnecting..")
+
+ close(c.quit)
+
+ if err := c.conn.CloseRead(); err != nil {
+ Log.Debugf("closing read failed: %v", err)
+ }
+
+ // wait for readerLoop to return
+ c.wg.Wait()
+
+ if err := c.close(); err != nil {
+ Log.Debugf("closing failed: %v", err)
+ }
+
+ if err := c.disconnect(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func (c *vppClient) connect(sockAddr string) error {
addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
@@ -188,7 +217,7 @@ func (c *vppClient) connect(sockAddr string) error {
// we try different type of socket for backwards compatbility with VPP<=19.04
if strings.Contains(err.Error(), "wrong type for socket") {
addr.Net = "unixpacket"
- Log.Debugf("%s, retrying connect with type unixpacket", err)
+ Log.Warnf("%s, retrying connect with type unixpacket", err)
conn, err = net.DialUnix("unixpacket", nil, addr)
}
if err != nil {
@@ -198,11 +227,20 @@ func (c *vppClient) connect(sockAddr string) error {
}
c.conn = conn
+ Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
+
c.reader = bufio.NewReader(c.conn)
c.writer = bufio.NewWriter(c.conn)
- Log.Debugf("Connected to socket: %v", addr)
+ return nil
+}
+func (c *vppClient) disconnect() error {
+ Log.Debugf("Closing socket")
+ if err := c.conn.Close(); err != nil {
+ Log.Debugln("Closing socket failed:", err)
+ return err
+ }
return nil
}
@@ -270,34 +308,6 @@ func (c *vppClient) open() error {
return nil
}
-func (c *vppClient) Disconnect() error {
- if c.conn == nil {
- return nil
- }
- Log.Debugf("Disconnecting..")
-
- close(c.quit)
-
- // force readerLoop to timeout
- if err := c.conn.SetReadDeadline(time.Now()); err != nil {
- return err
- }
-
- // wait for readerLoop to return
- c.wg.Wait()
-
- if err := c.close(); err != nil {
- return err
- }
-
- if err := c.conn.Close(); err != nil {
- Log.Debugln("Closing socket failed:", err)
- return err
- }
-
- return nil
-}
-
func (c *vppClient) close() error {
msgCodec := new(codec.MsgCodec)
@@ -373,7 +383,7 @@ func (c *vppClient) SendMsg(context uint32, data []byte) error {
}
copy(data[2:], buf.Bytes())
- Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
+ Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
if err := c.write(data); err != nil {
Log.Debugln("write error: ", err)
@@ -403,27 +413,25 @@ func (c *vppClient) write(msg []byte) error {
Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
}
- if err := c.writer.Flush(); err != nil {
- return err
- }
-
- for i := 0; i <= len(msg)/c.writer.Size(); i++ {
- x := i*c.writer.Size() + c.writer.Size()
+ writerSize := c.writer.Size()
+ for i := 0; i <= len(msg)/writerSize; i++ {
+ x := i*writerSize + writerSize
if x > len(msg) {
x = len(msg)
}
- Log.Debugf("x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/c.writer.Size())
- if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
+ Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
+ if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil {
return err
} else {
Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
}
- if err := c.writer.Flush(); err != nil {
- return err
- }
-
+ }
+ if err := c.writer.Flush(); err != nil {
+ return err
}
+ Log.Debugf(" -- write done")
+
return nil
}
@@ -435,6 +443,7 @@ type msgHeader struct {
func (c *vppClient) readerLoop() {
defer c.wg.Done()
defer Log.Debugf("reader quit")
+
for {
select {
case <-c.quit:
@@ -450,6 +459,7 @@ func (c *vppClient) readerLoop() {
Log.Debugf("read failed: %v", err)
continue
}
+
h := new(msgHeader)
if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
Log.Debugf("unpacking header failed: %v", err)
@@ -468,22 +478,22 @@ type msgheader struct {
}
func (c *vppClient) read() ([]byte, error) {
- Log.Debug("reading next msg..")
+ Log.Debug(" reading next msg..")
header := make([]byte, 16)
n, err := io.ReadAtLeast(c.reader, header, 16)
if err != nil {
return nil, err
- } else if n == 0 {
+ }
+ if n == 0 {
Log.Debugln("zero bytes header")
return nil, nil
- }
- if n != 16 {
+ } else if n != 16 {
Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
}
- Log.Debugf(" - read header %d bytes: % 0X", n, header)
+ Log.Debugf(" read header %d bytes: % 0X", n, header)
h := &msgheader{}
if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
@@ -498,7 +508,7 @@ func (c *vppClient) read() ([]byte, error) {
if err != nil {
return nil, err
}
- Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
+ Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n])
if msgLen > n {
remain := msgLen - n
@@ -520,6 +530,8 @@ func (c *vppClient) read() ([]byte, error) {
}
}
+ Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered())
+
return msg, nil
}
diff --git a/adapter/vppapiclient/vppapiclient.go b/adapter/vppapiclient/vppapiclient.go
index d8465d7..244be43 100644
--- a/adapter/vppapiclient/vppapiclient.go
+++ b/adapter/vppapiclient/vppapiclient.go
@@ -40,7 +40,7 @@ import (
var (
// MaxWaitReady defines maximum duration before waiting for shared memory
// segment times out
- MaxWaitReady = time.Second * 15
+ MaxWaitReady = time.Second * 10
)
const (
@@ -148,12 +148,11 @@ func (a *vppClient) WaitReady() error {
path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile)
}
- // check if file at the path already exists
+ // check if file already exists
if _, err := os.Stat(path); err == nil {
- // file exists, we are ready
- return nil
+ return nil // file exists, we are ready
} else if !os.IsNotExist(err) {
- return err
+ return err // some other error occurred
}
// file does not exist, start watching folder
@@ -168,18 +167,19 @@ func (a *vppClient) WaitReady() error {
return err
}
+ timeout := time.NewTimer(MaxWaitReady)
for {
select {
- case <-time.After(MaxWaitReady):
- return fmt.Errorf("waiting for shared memory segment timed out (%s)", MaxWaitReady)
+ case <-timeout.C:
+ return fmt.Errorf("timeout waiting (%s) for shm file: %s", MaxWaitReady, path)
+
case e := <-watcher.Errors:
return e
+
case ev := <-watcher.Events:
- if ev.Name == path {
- if (ev.Op & fsnotify.Create) == fsnotify.Create {
- // file was created, we are ready
- return nil
- }
+ if ev.Name == path && (ev.Op&fsnotify.Create) == fsnotify.Create {
+ // file created, we are ready
+ return nil
}
}
}
diff --git a/core/channel.go b/core/channel.go
index 2e73917..c464708 100644
--- a/core/channel.go
+++ b/core/channel.go
@@ -21,8 +21,9 @@ import (
"strings"
"time"
- "git.fd.io/govpp.git/api"
"github.com/sirupsen/logrus"
+
+ "git.fd.io/govpp.git/api"
)
var (
@@ -144,6 +145,7 @@ func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
for _, msg := range msgs {
+ // TODO: collect all incompatible messages and return summarized error
_, err := ch.msgIdentifier.GetMessageID(msg)
if err != nil {
return err
@@ -255,17 +257,23 @@ func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last
case vppReply := <-ch.replyChan:
ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
if ignore {
- logrus.Warnf("ignoring reply: %+v", vppReply)
+ logrus.WithFields(logrus.Fields{
+ "expSeqNum": expSeqNum,
+ "channel": ch.id,
+ }).Warnf("ignoring received reply: %+v (expecting: %s)", vppReply, msg.GetMessageName())
continue
}
return lastReplyReceived, err
case <-timer.C:
+ logrus.WithFields(logrus.Fields{
+ "expSeqNum": expSeqNum,
+ "channel": ch.id,
+ }).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName())
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
return false, err
}
}
- return
}
func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
diff --git a/core/connection.go b/core/connection.go
index 02f980a..605e1ef 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -30,8 +30,8 @@ import (
)
const (
- DefaultReconnectInterval = time.Second // default interval between reconnect attempts
- DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
+ DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts
+ DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
)
var (
@@ -255,7 +255,7 @@ func (c *Connection) releaseAPIChannel(ch *Channel) {
// connectLoop attempts to connect to VPP until it succeeds.
// Then it continues with healthCheckLoop.
func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
- reconnectAttempts := 0
+ var reconnectAttempts int
// loop until connected
for {
@@ -268,7 +268,7 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
break
} else if reconnectAttempts < c.maxAttempts {
reconnectAttempts++
- log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
time.Sleep(c.recInterval)
} else {
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
diff --git a/core/request_handler.go b/core/request_handler.go
index fd8aa59..d3f7bdc 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -93,7 +93,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"msg_size": len(data),
"seq_num": req.seqNum,
"msg_crc": req.msg.GetCrcString(),
- }).Debugf("--> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
+ }).Debugf("==> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
}
// send the request to VPP
@@ -118,7 +118,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"msg_id": c.pingReqID,
"msg_size": len(pingData),
"seq_num": req.seqNum,
- }).Debug(" -> sending control ping")
+ }).Debug("--> sending control ping")
if err := c.vppClient.SendMsg(context, pingData); err != nil {
log.WithFields(logger.Fields{
@@ -165,7 +165,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
"is_multi": isMulti,
"seq_num": seqNum,
"msg_crc": msg.GetCrcString(),
- }).Debugf("<-- govpp recv: %s", msg.GetMessageName())
+ }).Debugf("<== govpp recv: %s", msg.GetMessageName())
}
if context == 0 || c.isNotificationMessage(msgID) {