summaryrefslogtreecommitdiffstats
path: root/adapter/socketclient/socketclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'adapter/socketclient/socketclient.go')
-rw-r--r--adapter/socketclient/socketclient.go146
1 files changed, 79 insertions, 67 deletions
diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go
index 2e6d0af..96f23e6 100644
--- a/adapter/socketclient/socketclient.go
+++ b/adapter/socketclient/socketclient.go
@@ -47,7 +47,7 @@ var (
DefaultDisconnectTimeout = time.Millisecond * 100
// MaxWaitReady defines maximum duration before waiting for socket file
// times out
- MaxWaitReady = time.Second * 15
+ MaxWaitReady = time.Second * 10
// ClientName is used for identifying client in socket registration
ClientName = "govppsock"
)
@@ -73,9 +73,10 @@ func init() {
type vppClient struct {
sockAddr string
- conn *net.UnixConn
- reader *bufio.Reader
- writer *bufio.Writer
+
+ conn *net.UnixConn
+ reader *bufio.Reader
+ writer *bufio.Writer
connectTimeout time.Duration
disconnectTimeout time.Duration
@@ -116,42 +117,43 @@ func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
// WaitReady checks socket file existence and waits for it if necessary
func (c *vppClient) WaitReady() error {
- // check if file at the path already exists
+ // check if socket already exists
if _, err := os.Stat(c.sockAddr); err == nil {
- return nil
- } else if os.IsExist(err) {
- return err
+ return nil // socket exists, we are ready
+ } else if !os.IsNotExist(err) {
+ return err // some other error occurred
}
- // if not, watch for it
+ // socket does not exist, watch for it
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer func() {
if err := watcher.Close(); err != nil {
- Log.Errorf("failed to close file watcher: %v", err)
+ Log.Warnf("failed to close file watcher: %v", err)
}
}()
- // start watching directory
+ // start directory watcher
if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
return err
}
+ timeout := time.NewTimer(MaxWaitReady)
for {
select {
- case <-time.After(MaxWaitReady):
- return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
+ case <-timeout.C:
+ return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
+
case e := <-watcher.Errors:
return e
+
case ev := <-watcher.Events:
Log.Debugf("watcher event: %+v", ev)
- if ev.Name == c.sockAddr {
- if (ev.Op & fsnotify.Create) == fsnotify.Create {
- // socket was created, we are ready
- return nil
- }
+ if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
+ // socket created, we are ready
+ return nil
}
}
}
@@ -170,6 +172,7 @@ func (c *vppClient) Connect() error {
}
if err := c.open(); err != nil {
+ c.disconnect()
return err
}
@@ -180,6 +183,32 @@ func (c *vppClient) Connect() error {
return nil
}
+func (c *vppClient) Disconnect() error {
+ if c.conn == nil {
+ return nil
+ }
+ Log.Debugf("Disconnecting..")
+
+ close(c.quit)
+
+ if err := c.conn.CloseRead(); err != nil {
+ Log.Debugf("closing read failed: %v", err)
+ }
+
+ // wait for readerLoop to return
+ c.wg.Wait()
+
+ if err := c.close(); err != nil {
+ Log.Debugf("closing failed: %v", err)
+ }
+
+ if err := c.disconnect(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func (c *vppClient) connect(sockAddr string) error {
addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
@@ -188,7 +217,7 @@ func (c *vppClient) connect(sockAddr string) error {
// we try different type of socket for backwards compatbility with VPP<=19.04
if strings.Contains(err.Error(), "wrong type for socket") {
addr.Net = "unixpacket"
- Log.Debugf("%s, retrying connect with type unixpacket", err)
+ Log.Warnf("%s, retrying connect with type unixpacket", err)
conn, err = net.DialUnix("unixpacket", nil, addr)
}
if err != nil {
@@ -198,11 +227,20 @@ func (c *vppClient) connect(sockAddr string) error {
}
c.conn = conn
+ Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
+
c.reader = bufio.NewReader(c.conn)
c.writer = bufio.NewWriter(c.conn)
- Log.Debugf("Connected to socket: %v", addr)
+ return nil
+}
+func (c *vppClient) disconnect() error {
+ Log.Debugf("Closing socket")
+ if err := c.conn.Close(); err != nil {
+ Log.Debugln("Closing socket failed:", err)
+ return err
+ }
return nil
}
@@ -270,34 +308,6 @@ func (c *vppClient) open() error {
return nil
}
-func (c *vppClient) Disconnect() error {
- if c.conn == nil {
- return nil
- }
- Log.Debugf("Disconnecting..")
-
- close(c.quit)
-
- // force readerLoop to timeout
- if err := c.conn.SetReadDeadline(time.Now()); err != nil {
- return err
- }
-
- // wait for readerLoop to return
- c.wg.Wait()
-
- if err := c.close(); err != nil {
- return err
- }
-
- if err := c.conn.Close(); err != nil {
- Log.Debugln("Closing socket failed:", err)
- return err
- }
-
- return nil
-}
-
func (c *vppClient) close() error {
msgCodec := new(codec.MsgCodec)
@@ -373,7 +383,7 @@ func (c *vppClient) SendMsg(context uint32, data []byte) error {
}
copy(data[2:], buf.Bytes())
- Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
+ Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
if err := c.write(data); err != nil {
Log.Debugln("write error: ", err)
@@ -403,27 +413,25 @@ func (c *vppClient) write(msg []byte) error {
Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
}
- if err := c.writer.Flush(); err != nil {
- return err
- }
-
- for i := 0; i <= len(msg)/c.writer.Size(); i++ {
- x := i*c.writer.Size() + c.writer.Size()
+ writerSize := c.writer.Size()
+ for i := 0; i <= len(msg)/writerSize; i++ {
+ x := i*writerSize + writerSize
if x > len(msg) {
x = len(msg)
}
- Log.Debugf("x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/c.writer.Size())
- if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
+ Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
+ if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil {
return err
} else {
Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
}
- if err := c.writer.Flush(); err != nil {
- return err
- }
-
+ }
+ if err := c.writer.Flush(); err != nil {
+ return err
}
+ Log.Debugf(" -- write done")
+
return nil
}
@@ -435,6 +443,7 @@ type msgHeader struct {
func (c *vppClient) readerLoop() {
defer c.wg.Done()
defer Log.Debugf("reader quit")
+
for {
select {
case <-c.quit:
@@ -450,6 +459,7 @@ func (c *vppClient) readerLoop() {
Log.Debugf("read failed: %v", err)
continue
}
+
h := new(msgHeader)
if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
Log.Debugf("unpacking header failed: %v", err)
@@ -468,22 +478,22 @@ type msgheader struct {
}
func (c *vppClient) read() ([]byte, error) {
- Log.Debug("reading next msg..")
+ Log.Debug(" reading next msg..")
header := make([]byte, 16)
n, err := io.ReadAtLeast(c.reader, header, 16)
if err != nil {
return nil, err
- } else if n == 0 {
+ }
+ if n == 0 {
Log.Debugln("zero bytes header")
return nil, nil
- }
- if n != 16 {
+ } else if n != 16 {
Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
}
- Log.Debugf(" - read header %d bytes: % 0X", n, header)
+ Log.Debugf(" read header %d bytes: % 0X", n, header)
h := &msgheader{}
if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
@@ -498,7 +508,7 @@ func (c *vppClient) read() ([]byte, error) {
if err != nil {
return nil, err
}
- Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
+ Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n])
if msgLen > n {
remain := msgLen - n
@@ -520,6 +530,8 @@ func (c *vppClient) read() ([]byte, error) {
}
}
+ Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered())
+
return msg, nil
}