diff options
-rw-r--r-- | adapter/socketclient/socketclient.go | 146 | ||||
-rw-r--r-- | adapter/vppapiclient/vppapiclient.go | 24 | ||||
-rw-r--r-- | core/channel.go | 14 | ||||
-rw-r--r-- | core/connection.go | 8 | ||||
-rw-r--r-- | core/request_handler.go | 6 |
5 files changed, 109 insertions, 89 deletions
diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go index 2e6d0af..96f23e6 100644 --- a/adapter/socketclient/socketclient.go +++ b/adapter/socketclient/socketclient.go @@ -47,7 +47,7 @@ var ( DefaultDisconnectTimeout = time.Millisecond * 100 // MaxWaitReady defines maximum duration before waiting for socket file // times out - MaxWaitReady = time.Second * 15 + MaxWaitReady = time.Second * 10 // ClientName is used for identifying client in socket registration ClientName = "govppsock" ) @@ -73,9 +73,10 @@ func init() { type vppClient struct { sockAddr string - conn *net.UnixConn - reader *bufio.Reader - writer *bufio.Writer + + conn *net.UnixConn + reader *bufio.Reader + writer *bufio.Writer connectTimeout time.Duration disconnectTimeout time.Duration @@ -116,42 +117,43 @@ func (c *vppClient) SetDisconnectTimeout(t time.Duration) { // WaitReady checks socket file existence and waits for it if necessary func (c *vppClient) WaitReady() error { - // check if file at the path already exists + // check if socket already exists if _, err := os.Stat(c.sockAddr); err == nil { - return nil - } else if os.IsExist(err) { - return err + return nil // socket exists, we are ready + } else if !os.IsNotExist(err) { + return err // some other error occurred } - // if not, watch for it + // socket does not exist, watch for it watcher, err := fsnotify.NewWatcher() if err != nil { return err } defer func() { if err := watcher.Close(); err != nil { - Log.Errorf("failed to close file watcher: %v", err) + Log.Warnf("failed to close file watcher: %v", err) } }() - // start watching directory + // start directory watcher if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil { return err } + timeout := time.NewTimer(MaxWaitReady) for { select { - case <-time.After(MaxWaitReady): - return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady) + case <-timeout.C: + return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr) + case e := <-watcher.Errors: return e + case ev := <-watcher.Events: Log.Debugf("watcher event: %+v", ev) - if ev.Name == c.sockAddr { - if (ev.Op & fsnotify.Create) == fsnotify.Create { - // socket was created, we are ready - return nil - } + if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create { + // socket created, we are ready + return nil } } } @@ -170,6 +172,7 @@ func (c *vppClient) Connect() error { } if err := c.open(); err != nil { + c.disconnect() return err } @@ -180,6 +183,32 @@ func (c *vppClient) Connect() error { return nil } +func (c *vppClient) Disconnect() error { + if c.conn == nil { + return nil + } + Log.Debugf("Disconnecting..") + + close(c.quit) + + if err := c.conn.CloseRead(); err != nil { + Log.Debugf("closing read failed: %v", err) + } + + // wait for readerLoop to return + c.wg.Wait() + + if err := c.close(); err != nil { + Log.Debugf("closing failed: %v", err) + } + + if err := c.disconnect(); err != nil { + return err + } + + return nil +} + func (c *vppClient) connect(sockAddr string) error { addr := &net.UnixAddr{Name: sockAddr, Net: "unix"} @@ -188,7 +217,7 @@ func (c *vppClient) connect(sockAddr string) error { // we try different type of socket for backwards compatbility with VPP<=19.04 if strings.Contains(err.Error(), "wrong type for socket") { addr.Net = "unixpacket" - Log.Debugf("%s, retrying connect with type unixpacket", err) + Log.Warnf("%s, retrying connect with type unixpacket", err) conn, err = net.DialUnix("unixpacket", nil, addr) } if err != nil { @@ -198,11 +227,20 @@ func (c *vppClient) connect(sockAddr string) error { } c.conn = conn + Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr)) + c.reader = bufio.NewReader(c.conn) c.writer = bufio.NewWriter(c.conn) - Log.Debugf("Connected to socket: %v", addr) + return nil +} +func (c *vppClient) disconnect() error { + Log.Debugf("Closing socket") + if err := c.conn.Close(); err != nil { + Log.Debugln("Closing socket failed:", err) + return err + } return nil } @@ -270,34 +308,6 @@ func (c *vppClient) open() error { return nil } -func (c *vppClient) Disconnect() error { - if c.conn == nil { - return nil - } - Log.Debugf("Disconnecting..") - - close(c.quit) - - // force readerLoop to timeout - if err := c.conn.SetReadDeadline(time.Now()); err != nil { - return err - } - - // wait for readerLoop to return - c.wg.Wait() - - if err := c.close(); err != nil { - return err - } - - if err := c.conn.Close(); err != nil { - Log.Debugln("Closing socket failed:", err) - return err - } - - return nil -} - func (c *vppClient) close() error { msgCodec := new(codec.MsgCodec) @@ -373,7 +383,7 @@ func (c *vppClient) SendMsg(context uint32, data []byte) error { } copy(data[2:], buf.Bytes()) - Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data) + Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data) if err := c.write(data); err != nil { Log.Debugln("write error: ", err) @@ -403,27 +413,25 @@ func (c *vppClient) write(msg []byte) error { Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header) } - if err := c.writer.Flush(); err != nil { - return err - } - - for i := 0; i <= len(msg)/c.writer.Size(); i++ { - x := i*c.writer.Size() + c.writer.Size() + writerSize := c.writer.Size() + for i := 0; i <= len(msg)/writerSize; i++ { + x := i*writerSize + writerSize if x > len(msg) { x = len(msg) } - Log.Debugf("x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/c.writer.Size()) - if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil { + Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize) + if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil { return err } else { Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg) } - if err := c.writer.Flush(); err != nil { - return err - } - + } + if err := c.writer.Flush(); err != nil { + return err } + Log.Debugf(" -- write done") + return nil } @@ -435,6 +443,7 @@ type msgHeader struct { func (c *vppClient) readerLoop() { defer c.wg.Done() defer Log.Debugf("reader quit") + for { select { case <-c.quit: @@ -450,6 +459,7 @@ func (c *vppClient) readerLoop() { Log.Debugf("read failed: %v", err) continue } + h := new(msgHeader) if err := struc.Unpack(bytes.NewReader(msg), h); err != nil { Log.Debugf("unpacking header failed: %v", err) @@ -468,22 +478,22 @@ type msgheader struct { } func (c *vppClient) read() ([]byte, error) { - Log.Debug("reading next msg..") + Log.Debug(" reading next msg..") header := make([]byte, 16) n, err := io.ReadAtLeast(c.reader, header, 16) if err != nil { return nil, err - } else if n == 0 { + } + if n == 0 { Log.Debugln("zero bytes header") return nil, nil - } - if n != 16 { + } else if n != 16 { Log.Debugf("invalid header data (%d): % 0X", n, header[:n]) return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n) } - Log.Debugf(" - read header %d bytes: % 0X", n, header) + Log.Debugf(" read header %d bytes: % 0X", n, header) h := &msgheader{} if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil { @@ -498,7 +508,7 @@ func (c *vppClient) read() ([]byte, error) { if err != nil { return nil, err } - Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered()) + Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n]) if msgLen > n { remain := msgLen - n @@ -520,6 +530,8 @@ func (c *vppClient) read() ([]byte, error) { } } + Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered()) + return msg, nil } diff --git a/adapter/vppapiclient/vppapiclient.go b/adapter/vppapiclient/vppapiclient.go index d8465d7..244be43 100644 --- a/adapter/vppapiclient/vppapiclient.go +++ b/adapter/vppapiclient/vppapiclient.go @@ -40,7 +40,7 @@ import ( var ( // MaxWaitReady defines maximum duration before waiting for shared memory // segment times out - MaxWaitReady = time.Second * 15 + MaxWaitReady = time.Second * 10 ) const ( @@ -148,12 +148,11 @@ func (a *vppClient) WaitReady() error { path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile) } - // check if file at the path already exists + // check if file already exists if _, err := os.Stat(path); err == nil { - // file exists, we are ready - return nil + return nil // file exists, we are ready } else if !os.IsNotExist(err) { - return err + return err // some other error occurred } // file does not exist, start watching folder @@ -168,18 +167,19 @@ func (a *vppClient) WaitReady() error { return err } + timeout := time.NewTimer(MaxWaitReady) for { select { - case <-time.After(MaxWaitReady): - return fmt.Errorf("waiting for shared memory segment timed out (%s)", MaxWaitReady) + case <-timeout.C: + return fmt.Errorf("timeout waiting (%s) for shm file: %s", MaxWaitReady, path) + case e := <-watcher.Errors: return e + case ev := <-watcher.Events: - if ev.Name == path { - if (ev.Op & fsnotify.Create) == fsnotify.Create { - // file was created, we are ready - return nil - } + if ev.Name == path && (ev.Op&fsnotify.Create) == fsnotify.Create { + // file created, we are ready + return nil } } } diff --git a/core/channel.go b/core/channel.go index 2e73917..c464708 100644 --- a/core/channel.go +++ b/core/channel.go @@ -21,8 +21,9 @@ import ( "strings" "time" - "git.fd.io/govpp.git/api" "github.com/sirupsen/logrus" + + "git.fd.io/govpp.git/api" ) var ( @@ -144,6 +145,7 @@ func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error { for _, msg := range msgs { + // TODO: collect all incompatible messages and return summarized error _, err := ch.msgIdentifier.GetMessageID(msg) if err != nil { return err @@ -255,17 +257,23 @@ func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last case vppReply := <-ch.replyChan: ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) if ignore { - logrus.Warnf("ignoring reply: %+v", vppReply) + logrus.WithFields(logrus.Fields{ + "expSeqNum": expSeqNum, + "channel": ch.id, + }).Warnf("ignoring received reply: %+v (expecting: %s)", vppReply, msg.GetMessageName()) continue } return lastReplyReceived, err case <-timer.C: + logrus.WithFields(logrus.Fields{ + "expSeqNum": expSeqNum, + "channel": ch.id, + }).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName()) err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) return false, err } } - return } func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { diff --git a/core/connection.go b/core/connection.go index 02f980a..605e1ef 100644 --- a/core/connection.go +++ b/core/connection.go @@ -30,8 +30,8 @@ import ( ) const ( - DefaultReconnectInterval = time.Second // default interval between reconnect attempts - DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts + DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts + DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts ) var ( @@ -255,7 +255,7 @@ func (c *Connection) releaseAPIChannel(ch *Channel) { // connectLoop attempts to connect to VPP until it succeeds. // Then it continues with healthCheckLoop. func (c *Connection) connectLoop(connChan chan ConnectionEvent) { - reconnectAttempts := 0 + var reconnectAttempts int // loop until connected for { @@ -268,7 +268,7 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) { break } else if reconnectAttempts < c.maxAttempts { reconnectAttempts++ - log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) + log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) time.Sleep(c.recInterval) } else { connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err} diff --git a/core/request_handler.go b/core/request_handler.go index fd8aa59..d3f7bdc 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -93,7 +93,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { "msg_size": len(data), "seq_num": req.seqNum, "msg_crc": req.msg.GetCrcString(), - }).Debugf("--> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg) + }).Debugf("==> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg) } // send the request to VPP @@ -118,7 +118,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { "msg_id": c.pingReqID, "msg_size": len(pingData), "seq_num": req.seqNum, - }).Debug(" -> sending control ping") + }).Debug("--> sending control ping") if err := c.vppClient.SendMsg(context, pingData); err != nil { log.WithFields(logger.Fields{ @@ -165,7 +165,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { "is_multi": isMulti, "seq_num": seqNum, "msg_crc": msg.GetCrcString(), - }).Debugf("<-- govpp recv: %s", msg.GetMessageName()) + }).Debugf("<== govpp recv: %s", msg.GetMessageName()) } if context == 0 || c.isNotificationMessage(msgID) { |