diff options
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | adapter/socketclient/socketclient.go | 51 | ||||
-rw-r--r-- | adapter/vppapiclient/vppapiclient.go | 31 | ||||
-rw-r--r-- | core/connection.go | 37 | ||||
-rw-r--r-- | core/connection_test.go | 2 | ||||
-rw-r--r-- | examples/simple-client/simple_client.go | 2 | ||||
-rw-r--r-- | govpp.go | 5 |
7 files changed, 101 insertions, 29 deletions
@@ -23,7 +23,7 @@ examples: test: @echo "=> running tests" go test -cover ./cmd/... - go test -cover ./core ./api ./codec + go test -cover ./ ./adapter ./core ./api ./codec extras: @echo "=> building extras" diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go index eec4fd0..4c576c3 100644 --- a/adapter/socketclient/socketclient.go +++ b/adapter/socketclient/socketclient.go @@ -7,10 +7,13 @@ import ( "io" "net" "os" + "path/filepath" "strings" "sync" "time" + "github.com/fsnotify/fsnotify" + "github.com/lunixbochs/struc" logger "github.com/sirupsen/logrus" @@ -31,6 +34,10 @@ var ( ConnectTimeout = time.Second * 3 DisconnectTimeout = time.Second + // MaxWaitReady defines maximum duration before waiting for socket file + // times out + MaxWaitReady = time.Second * 15 + Debug = os.Getenv("DEBUG_GOVPP_SOCK") != "" DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != "" @@ -72,9 +79,47 @@ func nilCallback(msgID uint16, data []byte) { Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data)) } -func (*vppClient) WaitReady() error { - // TODO: add watcher for socket file? - return nil +// WaitReady checks socket file existence and waits for it if necessary +func (c *vppClient) WaitReady() error { + // check if file at the path already exists + if _, err := os.Stat(c.sockAddr); err == nil { + return nil + } else if os.IsExist(err) { + return err + } + + // if not, watch for it + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer func() { + if err := watcher.Close(); err != nil { + Log.Errorf("failed to close file watcher: %v", err) + } + }() + + // start watching directory + if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil { + return err + } + + for { + select { + case <-time.After(MaxWaitReady): + return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady) + case e := <-watcher.Errors: + return e + case ev := <-watcher.Events: + Log.Debugf("watcher event: %+v", ev) + if ev.Name == c.sockAddr { + if (ev.Op & fsnotify.Create) == fsnotify.Create { + // socket was created, we are ready + return nil + } + } + } + } } func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) { diff --git a/adapter/vppapiclient/vppapiclient.go b/adapter/vppapiclient/vppapiclient.go index cac8c71..f637060 100644 --- a/adapter/vppapiclient/vppapiclient.go +++ b/adapter/vppapiclient/vppapiclient.go @@ -78,12 +78,19 @@ import ( "os" "path/filepath" "reflect" + "time" "unsafe" "git.fd.io/govpp.git/adapter" "github.com/fsnotify/fsnotify" ) +var ( + // MaxWaitReady defines maximum duration before waiting for shared memory + // segment times out + MaxWaitReady = time.Second * 15 +) + const ( // shmDir is a directory where shared memory is supposed to be created. shmDir = "/dev/shm/" @@ -181,16 +188,15 @@ func (a *vppClient) SetMsgCallback(cb adapter.MsgCallback) { // WaitReady blocks until shared memory for sending // binary api calls is present on the file system. func (a *vppClient) WaitReady() error { - var path string - // join the path to the shared memory segment + var path string if a.shmPrefix == "" { path = filepath.Join(shmDir, vppShmFile) } else { path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile) } - // check if file at the path exists + // check if file at the path already exists if _, err := os.Stat(path); err == nil { // file exists, we are ready return nil @@ -205,21 +211,26 @@ func (a *vppClient) WaitReady() error { } defer watcher.Close() + // start watching directory if err := watcher.Add(shmDir); err != nil { return err } for { - ev := <-watcher.Events - if ev.Name == path { - if (ev.Op & fsnotify.Create) == fsnotify.Create { - // file was created, we are ready - break + select { + case <-time.After(MaxWaitReady): + return fmt.Errorf("waiting for shared memory segment timed out (%s)", MaxWaitReady) + case e := <-watcher.Errors: + return e + case ev := <-watcher.Events: + if ev.Name == path { + if (ev.Op & fsnotify.Create) == fsnotify.Create { + // file was created, we are ready + return nil + } } } } - - return nil } //export go_msg_callback diff --git a/core/connection.go b/core/connection.go index a21cc28..67c7e1d 100644 --- a/core/connection.go +++ b/core/connection.go @@ -29,6 +29,11 @@ import ( "git.fd.io/govpp.git/codec" ) +const ( + DefaultReconnectInterval = time.Second // default interval between reconnect attempts + DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts +) + var ( RequestChanBufSize = 100 // default size of the request channel buffer ReplyChanBufSize = 100 // default size of the reply channel buffer @@ -36,12 +41,10 @@ var ( ) var ( - HealthCheckProbeInterval = time.Second * 1 // default health check probe interval + HealthCheckProbeInterval = time.Second // default health check probe interval HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe HealthCheckThreshold = 1 // number of failed health checks until the error is reported - DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP - ReconnectInterval = time.Second * 1 // default interval for reconnect attempts - MaxReconnectAttempts = 3 // maximum number of reconnect attempts + DefaultReplyTimeout = time.Second // default timeout for replies from VPP ) // ConnectionState represents the current state of the connection to VPP. @@ -88,6 +91,9 @@ type Connection struct { vppClient adapter.VppAPI // VPP binary API client //statsClient adapter.StatsAPI // VPP stats API client + maxAttempts int // interval for reconnect attempts + recInterval time.Duration // maximum number of reconnect attempts + vppConnected uint32 // non-zero if the adapter is connected to VPP codec *codec.MsgCodec // message codec @@ -108,9 +114,18 @@ type Connection struct { lastReply time.Time // time of the last received reply from VPP } -func newConnection(binapi adapter.VppAPI) *Connection { +func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection { + if attempts == 0 { + attempts = DefaultMaxReconnectAttempts + } + if interval == 0 { + interval = DefaultReconnectInterval + } + c := &Connection{ vppClient: binapi, + maxAttempts: attempts, + recInterval: interval, codec: &codec.MsgCodec{}, msgIDs: make(map[string]uint16), msgMap: make(map[uint16]api.Message), @@ -126,7 +141,7 @@ func newConnection(binapi adapter.VppAPI) *Connection { // Only one connection attempt will be performed. func Connect(binapi adapter.VppAPI) (*Connection, error) { // create new connection handle - c := newConnection(binapi) + c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval) // blocking attempt to connect to VPP if err := c.connectVPP(); err != nil { @@ -140,9 +155,9 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) { // and ConnectionState channel. This call does not block until connection is established, it // returns immediately. The caller is supposed to watch the returned ConnectionState channel for // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. -func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) { +func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) { // create new connection handle - c := newConnection(binapi) + c := newConnection(binapi, attempts, interval) // asynchronously attempt to connect to VPP connChan := make(chan ConnectionEvent, NotificationChanBufSize) @@ -247,10 +262,10 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) { // signal connected event connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} break - } else if reconnectAttempts < MaxReconnectAttempts { + } else if reconnectAttempts < c.maxAttempts { reconnectAttempts++ - log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err) - time.Sleep(ReconnectInterval) + log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) + time.Sleep(c.recInterval) } else { connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err} return diff --git a/core/connection_test.go b/core/connection_test.go index 929f468..843c5ea 100644 --- a/core/connection_test.go +++ b/core/connection_test.go @@ -78,7 +78,7 @@ func TestAsyncConnection(t *testing.T) { defer ctx.teardownTest() ctx.conn.Disconnect() - conn, statusChan, err := core.AsyncConnect(ctx.mockVpp) + conn, statusChan, err := core.AsyncConnect(ctx.mockVpp, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval) ctx.conn = conn Expect(err).ShouldNot(HaveOccurred()) diff --git a/examples/simple-client/simple_client.go b/examples/simple-client/simple_client.go index 6429c35..a494e81 100644 --- a/examples/simple-client/simple_client.go +++ b/examples/simple-client/simple_client.go @@ -35,7 +35,7 @@ func main() { fmt.Println("Starting simple VPP client...") // connect to VPP - conn, conev, err := govpp.AsyncConnect("") + conn, conev, err := govpp.AsyncConnect("", core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval) if err != nil { log.Fatalln("ERROR:", err) } @@ -18,6 +18,7 @@ import ( "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/adapter/vppapiclient" "git.fd.io/govpp.git/core" + "time" ) var ( @@ -49,6 +50,6 @@ func Connect(shm string) (*core.Connection, error) { // This call does not block until connection is established, it returns immediately. The caller is // supposed to watch the returned ConnectionState channel for Connected/Disconnected events. // In case of disconnect, the library will asynchronously try to reconnect. -func AsyncConnect(shm string) (*core.Connection, chan core.ConnectionEvent, error) { - return core.AsyncConnect(getVppAdapter(shm)) +func AsyncConnect(shm string, attempts int, interval time.Duration) (*core.Connection, chan core.ConnectionEvent, error) { + return core.AsyncConnect(getVppAdapter(shm), attempts, interval) } |