aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--adapter/socketclient/socketclient.go51
-rw-r--r--adapter/vppapiclient/vppapiclient.go31
-rw-r--r--core/connection.go37
-rw-r--r--core/connection_test.go2
-rw-r--r--examples/simple-client/simple_client.go2
-rw-r--r--govpp.go5
7 files changed, 101 insertions, 29 deletions
diff --git a/Makefile b/Makefile
index e5ae92e..c4b742e 100644
--- a/Makefile
+++ b/Makefile
@@ -23,7 +23,7 @@ examples:
test:
@echo "=> running tests"
go test -cover ./cmd/...
- go test -cover ./core ./api ./codec
+ go test -cover ./ ./adapter ./core ./api ./codec
extras:
@echo "=> building extras"
diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go
index eec4fd0..4c576c3 100644
--- a/adapter/socketclient/socketclient.go
+++ b/adapter/socketclient/socketclient.go
@@ -7,10 +7,13 @@ import (
"io"
"net"
"os"
+ "path/filepath"
"strings"
"sync"
"time"
+ "github.com/fsnotify/fsnotify"
+
"github.com/lunixbochs/struc"
logger "github.com/sirupsen/logrus"
@@ -31,6 +34,10 @@ var (
ConnectTimeout = time.Second * 3
DisconnectTimeout = time.Second
+ // MaxWaitReady defines maximum duration before waiting for socket file
+ // times out
+ MaxWaitReady = time.Second * 15
+
Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
@@ -72,9 +79,47 @@ func nilCallback(msgID uint16, data []byte) {
Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
}
-func (*vppClient) WaitReady() error {
- // TODO: add watcher for socket file?
- return nil
+// WaitReady checks socket file existence and waits for it if necessary
+func (c *vppClient) WaitReady() error {
+ // check if file at the path already exists
+ if _, err := os.Stat(c.sockAddr); err == nil {
+ return nil
+ } else if os.IsExist(err) {
+ return err
+ }
+
+ // if not, watch for it
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if err := watcher.Close(); err != nil {
+ Log.Errorf("failed to close file watcher: %v", err)
+ }
+ }()
+
+ // start watching directory
+ if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
+ return err
+ }
+
+ for {
+ select {
+ case <-time.After(MaxWaitReady):
+ return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
+ case e := <-watcher.Errors:
+ return e
+ case ev := <-watcher.Events:
+ Log.Debugf("watcher event: %+v", ev)
+ if ev.Name == c.sockAddr {
+ if (ev.Op & fsnotify.Create) == fsnotify.Create {
+ // socket was created, we are ready
+ return nil
+ }
+ }
+ }
+ }
}
func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
diff --git a/adapter/vppapiclient/vppapiclient.go b/adapter/vppapiclient/vppapiclient.go
index cac8c71..f637060 100644
--- a/adapter/vppapiclient/vppapiclient.go
+++ b/adapter/vppapiclient/vppapiclient.go
@@ -78,12 +78,19 @@ import (
"os"
"path/filepath"
"reflect"
+ "time"
"unsafe"
"git.fd.io/govpp.git/adapter"
"github.com/fsnotify/fsnotify"
)
+var (
+ // MaxWaitReady defines maximum duration before waiting for shared memory
+ // segment times out
+ MaxWaitReady = time.Second * 15
+)
+
const (
// shmDir is a directory where shared memory is supposed to be created.
shmDir = "/dev/shm/"
@@ -181,16 +188,15 @@ func (a *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
// WaitReady blocks until shared memory for sending
// binary api calls is present on the file system.
func (a *vppClient) WaitReady() error {
- var path string
-
// join the path to the shared memory segment
+ var path string
if a.shmPrefix == "" {
path = filepath.Join(shmDir, vppShmFile)
} else {
path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile)
}
- // check if file at the path exists
+ // check if file at the path already exists
if _, err := os.Stat(path); err == nil {
// file exists, we are ready
return nil
@@ -205,21 +211,26 @@ func (a *vppClient) WaitReady() error {
}
defer watcher.Close()
+ // start watching directory
if err := watcher.Add(shmDir); err != nil {
return err
}
for {
- ev := <-watcher.Events
- if ev.Name == path {
- if (ev.Op & fsnotify.Create) == fsnotify.Create {
- // file was created, we are ready
- break
+ select {
+ case <-time.After(MaxWaitReady):
+ return fmt.Errorf("waiting for shared memory segment timed out (%s)", MaxWaitReady)
+ case e := <-watcher.Errors:
+ return e
+ case ev := <-watcher.Events:
+ if ev.Name == path {
+ if (ev.Op & fsnotify.Create) == fsnotify.Create {
+ // file was created, we are ready
+ return nil
+ }
}
}
}
-
- return nil
}
//export go_msg_callback
diff --git a/core/connection.go b/core/connection.go
index a21cc28..67c7e1d 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -29,6 +29,11 @@ import (
"git.fd.io/govpp.git/codec"
)
+const (
+ DefaultReconnectInterval = time.Second // default interval between reconnect attempts
+ DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
+)
+
var (
RequestChanBufSize = 100 // default size of the request channel buffer
ReplyChanBufSize = 100 // default size of the reply channel buffer
@@ -36,12 +41,10 @@ var (
)
var (
- HealthCheckProbeInterval = time.Second * 1 // default health check probe interval
+ HealthCheckProbeInterval = time.Second // default health check probe interval
HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
HealthCheckThreshold = 1 // number of failed health checks until the error is reported
- DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP
- ReconnectInterval = time.Second * 1 // default interval for reconnect attempts
- MaxReconnectAttempts = 3 // maximum number of reconnect attempts
+ DefaultReplyTimeout = time.Second // default timeout for replies from VPP
)
// ConnectionState represents the current state of the connection to VPP.
@@ -88,6 +91,9 @@ type Connection struct {
vppClient adapter.VppAPI // VPP binary API client
//statsClient adapter.StatsAPI // VPP stats API client
+ maxAttempts int // interval for reconnect attempts
+ recInterval time.Duration // maximum number of reconnect attempts
+
vppConnected uint32 // non-zero if the adapter is connected to VPP
codec *codec.MsgCodec // message codec
@@ -108,9 +114,18 @@ type Connection struct {
lastReply time.Time // time of the last received reply from VPP
}
-func newConnection(binapi adapter.VppAPI) *Connection {
+func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
+ if attempts == 0 {
+ attempts = DefaultMaxReconnectAttempts
+ }
+ if interval == 0 {
+ interval = DefaultReconnectInterval
+ }
+
c := &Connection{
vppClient: binapi,
+ maxAttempts: attempts,
+ recInterval: interval,
codec: &codec.MsgCodec{},
msgIDs: make(map[string]uint16),
msgMap: make(map[uint16]api.Message),
@@ -126,7 +141,7 @@ func newConnection(binapi adapter.VppAPI) *Connection {
// Only one connection attempt will be performed.
func Connect(binapi adapter.VppAPI) (*Connection, error) {
// create new connection handle
- c := newConnection(binapi)
+ c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
// blocking attempt to connect to VPP
if err := c.connectVPP(); err != nil {
@@ -140,9 +155,9 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) {
// and ConnectionState channel. This call does not block until connection is established, it
// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
-func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
+func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
// create new connection handle
- c := newConnection(binapi)
+ c := newConnection(binapi, attempts, interval)
// asynchronously attempt to connect to VPP
connChan := make(chan ConnectionEvent, NotificationChanBufSize)
@@ -247,10 +262,10 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
// signal connected event
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
break
- } else if reconnectAttempts < MaxReconnectAttempts {
+ } else if reconnectAttempts < c.maxAttempts {
reconnectAttempts++
- log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err)
- time.Sleep(ReconnectInterval)
+ log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ time.Sleep(c.recInterval)
} else {
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
return
diff --git a/core/connection_test.go b/core/connection_test.go
index 929f468..843c5ea 100644
--- a/core/connection_test.go
+++ b/core/connection_test.go
@@ -78,7 +78,7 @@ func TestAsyncConnection(t *testing.T) {
defer ctx.teardownTest()
ctx.conn.Disconnect()
- conn, statusChan, err := core.AsyncConnect(ctx.mockVpp)
+ conn, statusChan, err := core.AsyncConnect(ctx.mockVpp, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
ctx.conn = conn
Expect(err).ShouldNot(HaveOccurred())
diff --git a/examples/simple-client/simple_client.go b/examples/simple-client/simple_client.go
index 6429c35..a494e81 100644
--- a/examples/simple-client/simple_client.go
+++ b/examples/simple-client/simple_client.go
@@ -35,7 +35,7 @@ func main() {
fmt.Println("Starting simple VPP client...")
// connect to VPP
- conn, conev, err := govpp.AsyncConnect("")
+ conn, conev, err := govpp.AsyncConnect("", core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
if err != nil {
log.Fatalln("ERROR:", err)
}
diff --git a/govpp.go b/govpp.go
index f679242..d66d5dc 100644
--- a/govpp.go
+++ b/govpp.go
@@ -18,6 +18,7 @@ import (
"git.fd.io/govpp.git/adapter"
"git.fd.io/govpp.git/adapter/vppapiclient"
"git.fd.io/govpp.git/core"
+ "time"
)
var (
@@ -49,6 +50,6 @@ func Connect(shm string) (*core.Connection, error) {
// This call does not block until connection is established, it returns immediately. The caller is
// supposed to watch the returned ConnectionState channel for Connected/Disconnected events.
// In case of disconnect, the library will asynchronously try to reconnect.
-func AsyncConnect(shm string) (*core.Connection, chan core.ConnectionEvent, error) {
- return core.AsyncConnect(getVppAdapter(shm))
+func AsyncConnect(shm string, attempts int, interval time.Duration) (*core.Connection, chan core.ConnectionEvent, error) {
+ return core.AsyncConnect(getVppAdapter(shm), attempts, interval)
}