aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRastislav Szabo <raszabo@cisco.com>2017-05-25 13:47:43 +0200
committerRastislav Szabo <raszabo@cisco.com>2017-05-25 13:54:13 +0200
commit2d07847237e754d9050f06f565baa430c70ed937 (patch)
tree80588aeec912e95fa21b51520bbd527eb87f455b
parentc38cb25d746736f062ee16e87f553c8a4ec5fced (diff)
added async connect API, new structure of examples
Change-Id: Iab9bce174596c30998981e02b7030c248c423384 Signed-off-by: Rastislav Szabo <raszabo@cisco.com>
-rw-r--r--.gitignore3
-rw-r--r--Makefile8
-rw-r--r--README.md5
-rw-r--r--adapter/mock/binapi/binapi_reflect.go (renamed from adapter/mock/util/binapi_reflect.go)4
-rw-r--r--adapter/mock/mock_adapter.go4
-rw-r--r--api/api.go4
-rw-r--r--core/core.go365
-rw-r--r--core/core_test.go16
-rw-r--r--core/notification_handler.go (renamed from core/notifications.go)0
-rw-r--r--core/request_handler.go213
-rw-r--r--examples/cmd/simple-client/simple_client.go (renamed from examples/example_client.go)63
-rw-r--r--examples/cmd/stats-client/stats_client.go132
-rw-r--r--govpp.go13
13 files changed, 557 insertions, 273 deletions
diff --git a/.gitignore b/.gitignore
index f0ab2aa..bfa4a36 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
.idea
cmd/binapi-generator/binapi-generator
-examples/examples
+examples/cmd/simple-client/simple-client
+examples/cmd/stats-client/stats-client
diff --git a/Makefile b/Makefile
index 70f1f45..44a203b 100644
--- a/Makefile
+++ b/Makefile
@@ -1,6 +1,7 @@
build:
@cd cmd/binapi-generator && go build -v
- @cd examples && go build -v
+ @cd examples/cmd/simple-client && go build -v
+ @cd examples/cmd/stats-client && go build -v
test:
@cd cmd/binapi-generator && go test -cover .
@@ -11,8 +12,9 @@ install:
@cd cmd/binapi-generator && go install -v
clean:
- @rm cmd/binapi-generator/binapi-generator
- @rm examples/examples
+ @rm -f cmd/binapi-generator/binapi-generator
+ @rm -f examples/cmd/simple-client/simple-client
+ @rm -f examples/cmd/stats-client/stats-client
generate:
@cd core && go generate ./...
diff --git a/README.md b/README.md
index a7d80d9..a5088f6 100644
--- a/README.md
+++ b/README.md
@@ -84,7 +84,7 @@ func main() {
}
```
-The example above uses simple wrapper API over underlying go channels, see [example_client](examples/example_client.go)
+The example above uses simple wrapper API over underlying go channels, see [example client](examples/cmd/simple-client/simple_client.go)
for more examples, including the example on how to use the Go channels directly.
@@ -127,7 +127,8 @@ binapi-generator --input-dir=examples/bin_api --output-dir=examples/bin_api
In Go, [go generate](https://blog.golang.org/generate) tool can be leveraged to ease the code generation
process. It allows to specify generator instructions in any one of the regular (non-generated) `.go` files
-that are dependent on generated code using special comments, e.g. the one from [example_client](examples/example_client.go):
+that are dependent on generated code using special comments, e.g. the one from
+[example client](examples/cmd/simple-client/simple_client.go):
```go
// go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
```
diff --git a/adapter/mock/util/binapi_reflect.go b/adapter/mock/binapi/binapi_reflect.go
index 0ab065d..ee89909 100644
--- a/adapter/mock/util/binapi_reflect.go
+++ b/adapter/mock/binapi/binapi_reflect.go
@@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Package util is a helper package for generic handling of VPP binary
+// Package binapi is a helper package for generic handling of VPP binary
// API messages in the mock adapter and integration tests.
-package util
+package binapi
import (
"reflect"
diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go
index 1076ec2..8c88030 100644
--- a/adapter/mock/mock_adapter.go
+++ b/adapter/mock/mock_adapter.go
@@ -25,7 +25,7 @@ import (
"github.com/lunixbochs/struc"
"git.fd.io/govpp.git/adapter"
- "git.fd.io/govpp.git/adapter/mock/util"
+ "git.fd.io/govpp.git/adapter/mock/binapi"
"git.fd.io/govpp.git/api"
)
@@ -137,7 +137,7 @@ func (a *VppAdapter) RegisterBinAPITypes(binAPITypes map[string]reflect.Type) {
// ReplyTypeFor returns reply message type for given request message name.
func (a *VppAdapter) ReplyTypeFor(requestMsgName string) (reflect.Type, uint16, bool) {
- replyName, foundName := util.ReplyNameFor(requestMsgName)
+ replyName, foundName := binapi.ReplyNameFor(requestMsgName)
if foundName {
if reply, found := a.binAPITypes[replyName]; found {
msgID, err := a.GetMsgID(replyName, "")
diff --git a/api/api.go b/api/api.go
index fe6a34a..afaa8ad 100644
--- a/api/api.go
+++ b/api/api.go
@@ -32,7 +32,7 @@ const (
OtherMessage
)
-// Message is an interface that is implemented by all VPP Binary API messages generated by the binapi-generator.
+// Message is an interface that is implemented by all VPP Binary API messages generated by the binapigenerator.
type Message interface {
// GetMessageName returns the original VPP name of the message, as defined in the VPP API.
GetMessageName() string
@@ -44,7 +44,7 @@ type Message interface {
GetCrcString() string
}
-// DataType is an interface that is implemented by all VPP Binary API data types by the binapi-generator.
+// DataType is an interface that is implemented by all VPP Binary API data types by the binapi_generator.
type DataType interface {
// GetTypeName returns the original VPP name of the data type, as defined in the VPP API.
GetTypeName() string
diff --git a/core/core.go b/core/core.go
index 5b14a42..2484c81 100644
--- a/core/core.go
+++ b/core/core.go
@@ -14,14 +14,12 @@
package core
-//go:generate binapi-generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
-
import (
"errors"
- "fmt"
"os"
"sync"
"sync/atomic"
+ "time"
logger "github.com/Sirupsen/logrus"
@@ -31,14 +29,41 @@ import (
)
const (
- requestChannelBufSize = 100 // default size of the request channel buffers
- replyChannelBufSize = 100 // default size of the reply channel buffers
+ requestChannelBufSize = 100 // default size of the request channel buffers
+ replyChannelBufSize = 100 // default size of the reply channel buffers
+ notificationChannelBufSize = 100 // default size of the notification channel buffers
+)
+
+const (
+ healthCheckProbeInterval = time.Second * 1 // default health check probe interval
+ healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
+)
+
+// ConnectionState holds the current state of the connection to VPP.
+type ConnectionState int
+
+const (
+ // Connected connection state means that the connection to VPP has been successfully established.
+ Connected ConnectionState = iota
+
+ // Disconnected connection state means that the connection to VPP has been lost.
+ Disconnected = iota
)
+// ConnectionEvent is a notification about change in the VPP connection state.
+type ConnectionEvent struct {
+ // Timestamp holds the time when the event has been generated.
+ Timestamp time.Time
+
+ // State holds the new state of the connection to VPP at the time when the event has been generated.
+ State ConnectionState
+}
+
// Connection represents a shared memory connection to VPP via vppAdapter.
type Connection struct {
- vpp adapter.VppAdapter // VPP adapter
- codec *MsgCodec // message codec
+ vpp adapter.VppAdapter // VPP adapter
+ connected uint32 // non-zero if the adapter is connected to VPP
+ codec *MsgCodec // message codec
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
msgIDsLock sync.RWMutex // lock for the message IDs map
@@ -79,7 +104,57 @@ func SetLogger(l *logger.Logger) {
}
// Connect connects to VPP using specified VPP adapter and returns the connection handle.
+// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
+ // create new connection handle
+ c, err := newConnection(vppAdapter)
+ if err != nil {
+ return nil, err
+ }
+
+ // blocking attempt to connect to VPP
+ err = c.connectVPP()
+ if err != nil {
+ return nil, err
+ }
+
+ return conn, nil
+}
+
+// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
+// and ConnectionState channel. This call does not block until connection is established, it
+// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
+// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
+func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
+ // create new connection handle
+ c, err := newConnection(vppAdapter)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // asynchronously attempt to connect to VPP
+ connChan := make(chan ConnectionEvent, notificationChannelBufSize)
+ go c.connectLoop(connChan)
+
+ return conn, connChan, nil
+}
+
+// Disconnect disconnects from VPP and releases all connection-related resources.
+func (c *Connection) Disconnect() {
+ if c == nil {
+ return
+ }
+ connLock.Lock()
+ defer connLock.Unlock()
+
+ if c != nil && c.vpp != nil {
+ c.disconnectVPP()
+ }
+ conn = nil
+}
+
+// newConnection returns new connection handle.
+func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
connLock.Lock()
defer connLock.Unlock()
@@ -93,35 +168,102 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
conn.vpp.SetMsgCallback(msgCallback)
+ return conn, nil
+}
- logger.Debug("Connecting to VPP...")
+// connectVPP performs one blocking attempt to connect to VPP.
+func (c *Connection) connectVPP() error {
+ log.Debug("Connecting to VPP...")
- err := conn.vpp.Connect()
+ // blocking connect
+ err := c.vpp.Connect()
if err != nil {
- return nil, err
+ log.Warn(err)
+ return err
}
+ // store connected state
+ atomic.StoreUint32(&c.connected, 1)
+
// store control ping IDs
- conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
- conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
+ c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
+ c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
- logger.Debug("VPP connected.")
+ log.Info("Connected to VPP.")
+ return nil
+}
- return conn, nil
+// disconnectVPP disconnects from VPP in case it is connected.
+func (c *Connection) disconnectVPP() {
+ if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
+ c.vpp.Disconnect()
+ }
}
-// Disconnect disconnects from VPP.
-func (c *Connection) Disconnect() {
- if c == nil {
+// connectLoop attempts to connect to VPP until it succeeds.
+// Then it continues with healthCheckLoop.
+func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
+ // loop until connected
+ for {
+ err := c.connectVPP()
+ if err == nil {
+ // signal connected event
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
+ break
+ }
+ }
+
+ // we are now connected, continue with health check loop
+ c.healthCheckLoop(connChan)
+}
+
+// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
+// it continues with connectLoop and tries to reconnect.
+func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
+ // create a separate API channel for health check probes
+ ch, err := conn.NewAPIChannel()
+ if err != nil {
+ log.Error("Error by creating health check API channel, health check will be disabled:", err)
return
}
- connLock.Lock()
- defer connLock.Unlock()
- if c != nil && c.vpp != nil {
- c.vpp.Disconnect()
+ // send health check probes until an error occurs
+ for {
+ // wait for healthCheckProbeInterval
+ <-time.After(healthCheckProbeInterval)
+
+ if atomic.LoadUint32(&c.connected) == 0 {
+ // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
+ log.Debug("Disconnected on request, exiting health check loop.")
+ return
+ }
+
+ // send the control ping
+ ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
+
+ // expect response within timeout period
+ select {
+ case vppReply := <-ch.ReplyChan:
+ err = vppReply.Error
+ case <-time.After(healthCheckReplyTimeout):
+ err = errors.New("probe reply not received within the timeout period")
+ }
+
+ // in case of error, break & disconnect
+ if err != nil {
+ log.Errorf("VPP health check failed: %v", err)
+ // signal disconnected event via channel
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+ break
+ }
}
- conn = nil
+
+ // cleanup
+ ch.Close()
+ c.disconnectVPP()
+
+ // we are now disconnected, start connect loop
+ c.connectLoop(connChan)
}
// NewAPIChannel returns a new API channel for communication with VPP via govpp core.
@@ -163,84 +305,6 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
return ch, nil
}
-// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
- for {
- select {
- case req, ok := <-ch.ReqChan:
- // new request on the request channel
- if !ok {
- // after closing the request channel, release API channel and return
- c.releaseAPIChannel(ch, chMeta)
- return
- }
- c.processRequest(ch, chMeta, req)
-
- case req := <-ch.NotifSubsChan:
- // new request on the notification subscribe channel
- c.processNotifSubscribeRequest(ch, req)
- }
- }
-}
-
-// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
- // retrieve message ID
- msgID, err := c.GetMessageID(req.Message)
- if err != nil {
- error := fmt.Errorf("unable to retrieve message ID: %v", err)
- log.WithFields(logger.Fields{
- "msg_name": req.Message.GetMessageName(),
- "msg_crc": req.Message.GetCrcString(),
- }).Errorf("unable to retrieve message ID: %v", err)
- sendReply(ch, &api.VppReply{Error: error})
- return error
- }
-
- // encode the message into binary
- data, err := c.codec.EncodeMsg(req.Message, msgID)
- if err != nil {
- error := fmt.Errorf("unable to encode the messge: %v", err)
- log.WithFields(logger.Fields{
- "context": chMeta.id,
- "msg_id": msgID,
- }).Errorf("%v", error)
- sendReply(ch, &api.VppReply{Error: error})
- return error
- }
-
- // send the message
- log.WithFields(logger.Fields{
- "context": chMeta.id,
- "msg_id": msgID,
- "msg_size": len(data),
- }).Debug("Sending a message to VPP.")
-
- if req.Multipart {
- // expect multipart response
- atomic.StoreUint32(&chMeta.multipart, 1)
- }
-
- // send the request to VPP
- c.vpp.SendMsg(chMeta.id, data)
-
- if req.Multipart {
- // send a control ping to determine end of the multipart response
- ping := &vpe.ControlPing{}
- pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
-
- log.WithFields(logger.Fields{
- "context": chMeta.id,
- "msg_id": c.pingReqID,
- "msg_size": len(pingData),
- }).Debug("Sending a control ping to VPP.")
-
- c.vpp.SendMsg(chMeta.id, pingData)
- }
-
- return nil
-}
-
// releaseAPIChannel releases API channel that needs to be closed.
func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
log.WithFields(logger.Fields{
@@ -252,104 +316,3 @@ func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata)
delete(c.channels, chMeta.id)
c.channelsLock.Unlock()
}
-
-// msgCallback is called whenever any binary API message comes from VPP.
-func msgCallback(context uint32, msgID uint16, data []byte) {
- connLock.RLock()
- defer connLock.RUnlock()
-
- if conn == nil {
- log.Warn("Already disconnected, ignoring the message.")
- return
- }
-
- log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- "msg_size": len(data),
- }).Debug("Received a message from VPP.")
-
- if context == 0 || conn.isNotificationMessage(msgID) {
- // process the message as a notification
- conn.sendNotifications(msgID, data)
- return
- }
-
- // match ch according to the context
- conn.channelsLock.RLock()
- ch, ok := conn.channels[context]
- conn.channelsLock.RUnlock()
-
- if !ok {
- log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- }).Error("Context ID not known, ignoring the message.")
- return
- }
-
- chMeta := ch.Metadata().(*channelMetadata)
- lastReplyReceived := false
- // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
- if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
- lastReplyReceived = true
- }
-
- // send the data to the channel
- sendReply(ch, &api.VppReply{
- MessageID: msgID,
- Data: data,
- LastReplyReceived: lastReplyReceived,
- })
-}
-
-// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
-// it logs the error and do not send the message.
-func sendReply(ch *api.Channel, reply *api.VppReply) {
- select {
- case ch.ReplyChan <- reply:
- // reply sent successfully
- default:
- // unable to write into the channel without blocking
- log.WithFields(logger.Fields{
- "channel": ch,
- "msg_id": reply.MessageID,
- }).Warn("Unable to send the reply, reciever end not ready.")
- }
-}
-
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
- if c == nil {
- return 0, errors.New("nil connection passed in")
- }
- return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
-}
-
-// messageNameToID returns message ID of a message identified by its name and CRC.
-func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
- // try to get the ID from the map
- c.msgIDsLock.RLock()
- id, ok := c.msgIDs[msgName+msgCrc]
- c.msgIDsLock.RUnlock()
- if ok {
- return id, nil
- }
-
- // get the ID using VPP API
- id, err := c.vpp.GetMsgID(msgName, msgCrc)
- if err != nil {
- error := fmt.Errorf("unable to retrieve message ID: %v", err)
- log.WithFields(logger.Fields{
- "msg_name": msgName,
- "msg_crc": msgCrc,
- }).Errorf("unable to retrieve message ID: %v", err)
- return id, error
- }
-
- c.msgIDsLock.Lock()
- c.msgIDs[msgName+msgCrc] = id
- c.msgIDsLock.Unlock()
-
- return id, nil
-}
diff --git a/core/core_test.go b/core/core_test.go
index d3c2e2c..3184ef5 100644
--- a/core/core_test.go
+++ b/core/core_test.go
@@ -145,6 +145,7 @@ func TestNotifications(t *testing.T) {
}
func TestNilConnection(t *testing.T) {
+ RegisterTestingT(t)
var conn *Connection
ch, err := conn.NewAPIChannel()
@@ -168,6 +169,21 @@ func TestDoubleConnection(t *testing.T) {
Expect(conn).Should(BeNil())
}
+func TestAsyncConnection(t *testing.T) {
+ ctx := setupTest(t)
+ defer ctx.teardownTest()
+
+ ctx.conn.Disconnect()
+ conn, ch, err := AsyncConnect(ctx.mockVpp)
+ ctx.conn = conn
+
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(conn).ShouldNot(BeNil())
+
+ ev := <-ch
+ Expect(ev.State).Should(BeEquivalentTo(Connected))
+}
+
func TestFullBuffer(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
diff --git a/core/notifications.go b/core/notification_handler.go
index 5cd132c..5cd132c 100644
--- a/core/notifications.go
+++ b/core/notification_handler.go
diff --git a/core/request_handler.go b/core/request_handler.go
new file mode 100644
index 0000000..f4f5e92
--- /dev/null
+++ b/core/request_handler.go
@@ -0,0 +1,213 @@
+// Copyright (c) 2017 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package core
+
+import (
+ "errors"
+ "fmt"
+ "sync/atomic"
+
+ logger "github.com/Sirupsen/logrus"
+
+ "git.fd.io/govpp.git/api"
+ "git.fd.io/govpp.git/core/bin_api/vpe"
+)
+
+// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
+func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
+ for {
+ select {
+ case req, ok := <-ch.ReqChan:
+ // new request on the request channel
+ if !ok {
+ // after closing the request channel, release API channel and return
+ c.releaseAPIChannel(ch, chMeta)
+ return
+ }
+ c.processRequest(ch, chMeta, req)
+
+ case req := <-ch.NotifSubsChan:
+ // new request on the notification subscribe channel
+ c.processNotifSubscribeRequest(ch, req)
+ }
+ }
+}
+
+// processRequest processes a single request received on the request channel.
+func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
+ // check whether we are connected to VPP
+ if atomic.LoadUint32(&c.connected) == 0 {
+ error := errors.New("not connected to VPP, ignoring the request")
+ log.Error(error)
+ sendReply(ch, &api.VppReply{Error: error})
+ return error
+ }
+
+ // retrieve message ID
+ msgID, err := c.GetMessageID(req.Message)
+ if err != nil {
+ error := fmt.Errorf("unable to retrieve message ID: %v", err)
+ log.WithFields(logger.Fields{
+ "msg_name": req.Message.GetMessageName(),
+ "msg_crc": req.Message.GetCrcString(),
+ }).Error(err)
+ sendReply(ch, &api.VppReply{Error: error})
+ return error
+ }
+
+ // encode the message into binary
+ data, err := c.codec.EncodeMsg(req.Message, msgID)
+ if err != nil {
+ error := fmt.Errorf("unable to encode the messge: %v", err)
+ log.WithFields(logger.Fields{
+ "context": chMeta.id,
+ "msg_id": msgID,
+ }).Error(error)
+ sendReply(ch, &api.VppReply{Error: error})
+ return error
+ }
+
+ // send the message
+ log.WithFields(logger.Fields{
+ "context": chMeta.id,
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Debug("Sending a message to VPP.")
+
+ if req.Multipart {
+ // expect multipart response
+ atomic.StoreUint32(&chMeta.multipart, 1)
+ }
+
+ // send the request to VPP
+ c.vpp.SendMsg(chMeta.id, data)
+
+ if req.Multipart {
+ // send a control ping to determine end of the multipart response
+ ping := &vpe.ControlPing{}
+ pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
+
+ log.WithFields(logger.Fields{
+ "context": chMeta.id,
+ "msg_id": c.pingReqID,
+ "msg_size": len(pingData),
+ }).Debug("Sending a control ping to VPP.")
+
+ c.vpp.SendMsg(chMeta.id, pingData)
+ }
+
+ return nil
+}
+
+// msgCallback is called whenever any binary API message comes from VPP.
+func msgCallback(context uint32, msgID uint16, data []byte) {
+ connLock.RLock()
+ defer connLock.RUnlock()
+
+ if conn == nil {
+ log.Warn("Already disconnected, ignoring the message.")
+ return
+ }
+
+ log.WithFields(logger.Fields{
+ "context": context,
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Debug("Received a message from VPP.")
+
+ if context == 0 || conn.isNotificationMessage(msgID) {
+ // process the message as a notification
+ conn.sendNotifications(msgID, data)
+ return
+ }
+
+ // match ch according to the context
+ conn.channelsLock.RLock()
+ ch, ok := conn.channels[context]
+ conn.channelsLock.RUnlock()
+
+ if !ok {
+ log.WithFields(logger.Fields{
+ "context": context,
+ "msg_id": msgID,
+ }).Error("Context ID not known, ignoring the message.")
+ return
+ }
+
+ chMeta := ch.Metadata().(*channelMetadata)
+ lastReplyReceived := false
+ // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
+ if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
+ lastReplyReceived = true
+ }
+
+ // send the data to the channel
+ sendReply(ch, &api.VppReply{
+ MessageID: msgID,
+ Data: data,
+ LastReplyReceived: lastReplyReceived,
+ })
+}
+
+// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
+// it logs the error and do not send the message.
+func sendReply(ch *api.Channel, reply *api.VppReply) {
+ select {
+ case ch.ReplyChan <- reply:
+ // reply sent successfully
+ default:
+ // unable to write into the channel without blocking
+ log.WithFields(logger.Fields{
+ "channel": ch,
+ "msg_id": reply.MessageID,
+ }).Warn("Unable to send the reply, reciever end not ready.")
+ }
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+ if c == nil {
+ return 0, errors.New("nil connection passed in")
+ }
+ return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
+}
+
+// messageNameToID returns message ID of a message identified by its name and CRC.
+func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
+ // try to get the ID from the map
+ c.msgIDsLock.RLock()
+ id, ok := c.msgIDs[msgName+msgCrc]
+ c.msgIDsLock.RUnlock()
+ if ok {
+ return id, nil
+ }
+
+ // get the ID using VPP API
+ id, err := c.vpp.GetMsgID(msgName, msgCrc)
+ if err != nil {
+ error := fmt.Errorf("unable to retrieve message ID: %v", err)
+ log.WithFields(logger.Fields{
+ "msg_name": msgName,
+ "msg_crc": msgCrc,
+ }).Errorf("unable to retrieve message ID: %v", err)
+ return id, error
+ }
+
+ c.msgIDsLock.Lock()
+ c.msgIDs[msgName+msgCrc] = id
+ c.msgIDsLock.Unlock()
+
+ return id, nil
+}
diff --git a/examples/example_client.go b/examples/cmd/simple-client/simple_client.go
index f2e5804..6e46d6b 100644
--- a/examples/example_client.go
+++ b/examples/cmd/simple-client/simple_client.go
@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Example VPP management application that exercises the govpp API on real-world use-cases.
+// Binary simple-client is an example VPP management application that exercises the
+// govpp API on real-world use-cases.
package main
// Generates Go bindings for all VPP APIs located in the json directory.
@@ -22,19 +23,16 @@ import (
"fmt"
"net"
"os"
- "os/signal"
"git.fd.io/govpp.git"
"git.fd.io/govpp.git/api"
- "git.fd.io/govpp.git/api/ifcounters"
- "git.fd.io/govpp.git/core/bin_api/vpe"
"git.fd.io/govpp.git/examples/bin_api/acl"
"git.fd.io/govpp.git/examples/bin_api/interfaces"
"git.fd.io/govpp.git/examples/bin_api/tap"
)
func main() {
- fmt.Println("Starting example VPP client...")
+ fmt.Println("Starting simple VPP client...")
// connect to VPP
conn, err := govpp.Connect()
@@ -64,8 +62,6 @@ func main() {
interfaceDump(ch)
interfaceNotifications(ch)
-
- //interfaceCounters(ch)
}
// compatibilityCheck shows how an management application can check whether generated API messages are
@@ -223,56 +219,3 @@ func interfaceNotifications(ch *api.Channel) {
// unsubscribe from delivery of the notifications
ch.UnsubscribeNotification(subs)
}
-
-// interfaceCounters is an example of using notification API to periodically retrieve interface statistics.
-// The ifcounters package contains the API that can be used to decode the strange VnetInterfaceCounters message.
-func interfaceCounters(ch *api.Channel) {
- // subscribe for interface counters notifications
- notifChan := make(chan api.Message, 100)
- subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceCounters)
-
- // enable interface counters notifications from VPP
- ch.SendRequest(&vpe.WantStats{
- Pid: uint32(os.Getpid()),
- EnableDisable: 1,
- }).ReceiveReply(&vpe.WantStatsReply{})
-
- // create channel for Interrupt signal
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, os.Interrupt)
-
- // loop until Interrupt signal is received
-loop:
- for {
- select {
- case <-sigChan:
- // interrupt received
- break loop
- case notifMsg := <-notifChan:
- notif := notifMsg.(*interfaces.VnetInterfaceCounters)
- // notification received
- fmt.Printf("%+v\n", notif)
-
- if notif.IsCombined == 0 {
- // simple counter
- counters, err := ifcounters.DecodeCounters(ifcounters.VnetInterfaceCounters(*notif))
- if err != nil {
- fmt.Println("Error:", err)
- } else {
- fmt.Printf("%+v\n", counters)
- }
- } else {
- // combined counter
- counters, err := ifcounters.DecodeCombinedCounters(ifcounters.VnetInterfaceCounters(*notif))
- if err != nil {
- fmt.Println("Error:", err)
- } else {
- fmt.Printf("%+v\n", counters)
- }
- }
- }
- }
-
- // unsubscribe from delivery of the notifications
- ch.UnsubscribeNotification(subs)
-}
diff --git a/examples/cmd/stats-client/stats_client.go b/examples/cmd/stats-client/stats_client.go
new file mode 100644
index 0000000..fc40b24
--- /dev/null
+++ b/examples/cmd/stats-client/stats_client.go
@@ -0,0 +1,132 @@
+// Copyright (c) 2017 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Binary stats-client is an example VPP management application that exercises the
+// govpp API for interface counters together with asynchronous connection to VPP.
+package main
+
+// Generates Go bindings for all VPP APIs located in the json directory.
+//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
+
+import (
+ "fmt"
+ "os"
+ "os/signal"
+
+ "git.fd.io/govpp.git"
+ "git.fd.io/govpp.git/api"
+ "git.fd.io/govpp.git/api/ifcounters"
+ "git.fd.io/govpp.git/core"
+ "git.fd.io/govpp.git/core/bin_api/vpe"
+ "git.fd.io/govpp.git/examples/bin_api/interfaces"
+)
+
+func main() {
+ fmt.Println("Starting stats VPP client...")
+
+ // async connect to VPP
+ conn, statCh, err := govpp.AsyncConnect()
+ if err != nil {
+ fmt.Println("Error:", err)
+ os.Exit(1)
+ }
+ defer conn.Disconnect()
+
+ // create an API channel that will be used in the examples
+ ch, err := conn.NewAPIChannel()
+ if err != nil {
+ fmt.Println("Error:", err)
+ os.Exit(1)
+ }
+ defer ch.Close()
+
+ // create channel for Interrupt signal
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, os.Interrupt)
+
+ var subs *api.NotifSubscription
+ var notifChan chan api.Message
+
+ // loop until Interrupt signal is received
+loop:
+ for {
+ select {
+
+ case connEvent := <-statCh:
+ // VPP connection state change
+ switch connEvent.State {
+ case core.Connected:
+ fmt.Println("VPP connected.")
+ if subs == nil {
+ subs, notifChan = subscribeNotification(ch)
+ }
+ requestStatistics(ch)
+
+ case core.Disconnected:
+ fmt.Println("VPP disconnected.")
+ }
+
+ case notifMsg := <-notifChan:
+ // counter notification received
+ processCounters(notifMsg.(*interfaces.VnetInterfaceCounters))
+
+ case <-sigChan:
+ // interrupt received
+ fmt.Println("Interrupt received, exiting.")
+ break loop
+ }
+ }
+
+ ch.UnsubscribeNotification(subs)
+}
+
+// subscribeNotification subscribes for interface counters notifications.
+func subscribeNotification(ch *api.Channel) (*api.NotifSubscription, chan api.Message) {
+
+ notifChan := make(chan api.Message, 100)
+ subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceCounters)
+
+ return subs, notifChan
+}
+
+// requestStatistics requests interface counters notifications from VPP.
+func requestStatistics(ch *api.Channel) {
+ ch.SendRequest(&vpe.WantStats{
+ Pid: uint32(os.Getpid()),
+ EnableDisable: 1,
+ }).ReceiveReply(&vpe.WantStatsReply{})
+}
+
+// processCounters processes a counter message received from VPP.
+func processCounters(msg *interfaces.VnetInterfaceCounters) {
+ fmt.Printf("%+v\n", msg)
+
+ if msg.IsCombined == 0 {
+ // simple counter
+ counters, err := ifcounters.DecodeCounters(ifcounters.VnetInterfaceCounters(*msg))
+ if err != nil {
+ fmt.Println("Error:", err)
+ } else {
+ fmt.Printf("%+v\n", counters)
+ }
+ } else {
+ // combined counter
+ counters, err := ifcounters.DecodeCombinedCounters(ifcounters.VnetInterfaceCounters(*msg))
+ if err != nil {
+ fmt.Println("Error:", err)
+ } else {
+ fmt.Printf("%+v\n", counters)
+ }
+ }
+}
diff --git a/govpp.go b/govpp.go
index 5d3ed7f..6f0cc2e 100644
--- a/govpp.go
+++ b/govpp.go
@@ -24,6 +24,7 @@ var vppAdapter adapter.VppAdapter // VPP Adapter that will be used in the subseq
// Connect connects the govpp core to VPP either using the default VPP Adapter, or using the adapter previously
// set by SetAdapter (useful mostly just for unit/integration tests with mocked VPP adapter).
+// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
func Connect() (*core.Connection, error) {
if vppAdapter == nil {
vppAdapter = vppapiclient.NewVppAdapter()
@@ -31,6 +32,18 @@ func Connect() (*core.Connection, error) {
return core.Connect(vppAdapter)
}
+// AsyncConnect asynchronously connects the govpp core to VPP either using the default VPP Adapter,
+// or using the adapter previously set by SetAdapter.
+// This call does not block until connection is established, it returns immediately. The caller is
+// supposed to watch the returned ConnectionState channel for Connected/Disconnected events.
+// In case of disconnect, the library will asynchronously try to reconnect.
+func AsyncConnect() (*core.Connection, chan core.ConnectionEvent, error) {
+ if vppAdapter == nil {
+ vppAdapter = vppapiclient.NewVppAdapter()
+ }
+ return core.AsyncConnect(vppAdapter)
+}
+
// SetAdapter sets the adapter that will be used for connections to VPP in the subsequent `Connect` calls.
func SetAdapter(ad adapter.VppAdapter) {
vppAdapter = ad