From 2d07847237e754d9050f06f565baa430c70ed937 Mon Sep 17 00:00:00 2001 From: Rastislav Szabo Date: Thu, 25 May 2017 13:47:43 +0200 Subject: added async connect API, new structure of examples Change-Id: Iab9bce174596c30998981e02b7030c248c423384 Signed-off-by: Rastislav Szabo --- .gitignore | 3 +- Makefile | 8 +- README.md | 5 +- adapter/mock/binapi/binapi_reflect.go | 74 ++++++ adapter/mock/mock_adapter.go | 4 +- adapter/mock/util/binapi_reflect.go | 74 ------ api/api.go | 4 +- core/core.go | 365 +++++++++++++--------------- core/core_test.go | 16 ++ core/notification_handler.go | 182 ++++++++++++++ core/notifications.go | 182 -------------- core/request_handler.go | 213 ++++++++++++++++ examples/cmd/simple-client/simple_client.go | 221 +++++++++++++++++ examples/cmd/stats-client/stats_client.go | 132 ++++++++++ examples/example_client.go | 278 --------------------- govpp.go | 13 + 16 files changed, 1029 insertions(+), 745 deletions(-) create mode 100644 adapter/mock/binapi/binapi_reflect.go delete mode 100644 adapter/mock/util/binapi_reflect.go create mode 100644 core/notification_handler.go delete mode 100644 core/notifications.go create mode 100644 core/request_handler.go create mode 100644 examples/cmd/simple-client/simple_client.go create mode 100644 examples/cmd/stats-client/stats_client.go delete mode 100644 examples/example_client.go diff --git a/.gitignore b/.gitignore index f0ab2aa..bfa4a36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea cmd/binapi-generator/binapi-generator -examples/examples +examples/cmd/simple-client/simple-client +examples/cmd/stats-client/stats-client diff --git a/Makefile b/Makefile index 70f1f45..44a203b 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ build: @cd cmd/binapi-generator && go build -v - @cd examples && go build -v + @cd examples/cmd/simple-client && go build -v + @cd examples/cmd/stats-client && go build -v test: @cd cmd/binapi-generator && go test -cover . @@ -11,8 +12,9 @@ install: @cd cmd/binapi-generator && go install -v clean: - @rm cmd/binapi-generator/binapi-generator - @rm examples/examples + @rm -f cmd/binapi-generator/binapi-generator + @rm -f examples/cmd/simple-client/simple-client + @rm -f examples/cmd/stats-client/stats-client generate: @cd core && go generate ./... diff --git a/README.md b/README.md index a7d80d9..a5088f6 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ func main() { } ``` -The example above uses simple wrapper API over underlying go channels, see [example_client](examples/example_client.go) +The example above uses simple wrapper API over underlying go channels, see [example client](examples/cmd/simple-client/simple_client.go) for more examples, including the example on how to use the Go channels directly. @@ -127,7 +127,8 @@ binapi-generator --input-dir=examples/bin_api --output-dir=examples/bin_api In Go, [go generate](https://blog.golang.org/generate) tool can be leveraged to ease the code generation process. It allows to specify generator instructions in any one of the regular (non-generated) `.go` files -that are dependent on generated code using special comments, e.g. the one from [example_client](examples/example_client.go): +that are dependent on generated code using special comments, e.g. the one from +[example client](examples/cmd/simple-client/simple_client.go): ```go // go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api ``` diff --git a/adapter/mock/binapi/binapi_reflect.go b/adapter/mock/binapi/binapi_reflect.go new file mode 100644 index 0000000..ee89909 --- /dev/null +++ b/adapter/mock/binapi/binapi_reflect.go @@ -0,0 +1,74 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package binapi is a helper package for generic handling of VPP binary +// API messages in the mock adapter and integration tests. +package binapi + +import ( + "reflect" +) + +const swIfIndexName = "swIfIndex" +const retvalName = "retval" +const replySuffix = "_reply" + +// findFieldOfType finds the field specified by its name in provided message defined as reflect.Type data type. +func findFieldOfType(reply reflect.Type, fieldName string) (reflect.StructField, bool) { + if reply.Kind() == reflect.Struct { + field, found := reply.FieldByName(fieldName) + return field, found + } else if reply.Kind() == reflect.Ptr && reply.Elem().Kind() == reflect.Struct { + field, found := reply.Elem().FieldByName(fieldName) + return field, found + } + return reflect.StructField{}, false +} + +// findFieldOfValue finds the field specified by its name in provided message defined as reflect.Value data type. +func findFieldOfValue(reply reflect.Value, fieldName string) (reflect.Value, bool) { + if reply.Kind() == reflect.Struct { + field := reply.FieldByName(fieldName) + return field, field.IsValid() + } else if reply.Kind() == reflect.Ptr && reply.Elem().Kind() == reflect.Struct { + field := reply.Elem().FieldByName(fieldName) + return field, field.IsValid() + } + return reflect.Value{}, false +} + +// HasSwIfIdx checks whether provided message has the swIfIndex field. +func HasSwIfIdx(msg reflect.Type) bool { + _, found := findFieldOfType(msg, swIfIndexName) + return found +} + +// SetSwIfIdx sets the swIfIndex field of provided message to provided value. +func SetSwIfIdx(msg reflect.Value, swIfIndex uint32) { + if field, found := findFieldOfValue(msg, swIfIndexName); found { + field.Set(reflect.ValueOf(swIfIndex)) + } +} + +// SetRetval sets the retval field of provided message to provided value. +func SetRetval(msg reflect.Value, retVal int32) { + if field, found := findFieldOfValue(msg, retvalName); found { + field.Set(reflect.ValueOf(retVal)) + } +} + +// ReplyNameFor returns reply message name to the given request message name. +func ReplyNameFor(requestName string) (string, bool) { + return requestName + replySuffix, true +} diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go index 1076ec2..8c88030 100644 --- a/adapter/mock/mock_adapter.go +++ b/adapter/mock/mock_adapter.go @@ -25,7 +25,7 @@ import ( "github.com/lunixbochs/struc" "git.fd.io/govpp.git/adapter" - "git.fd.io/govpp.git/adapter/mock/util" + "git.fd.io/govpp.git/adapter/mock/binapi" "git.fd.io/govpp.git/api" ) @@ -137,7 +137,7 @@ func (a *VppAdapter) RegisterBinAPITypes(binAPITypes map[string]reflect.Type) { // ReplyTypeFor returns reply message type for given request message name. func (a *VppAdapter) ReplyTypeFor(requestMsgName string) (reflect.Type, uint16, bool) { - replyName, foundName := util.ReplyNameFor(requestMsgName) + replyName, foundName := binapi.ReplyNameFor(requestMsgName) if foundName { if reply, found := a.binAPITypes[replyName]; found { msgID, err := a.GetMsgID(replyName, "") diff --git a/adapter/mock/util/binapi_reflect.go b/adapter/mock/util/binapi_reflect.go deleted file mode 100644 index 0ab065d..0000000 --- a/adapter/mock/util/binapi_reflect.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package util is a helper package for generic handling of VPP binary -// API messages in the mock adapter and integration tests. -package util - -import ( - "reflect" -) - -const swIfIndexName = "swIfIndex" -const retvalName = "retval" -const replySuffix = "_reply" - -// findFieldOfType finds the field specified by its name in provided message defined as reflect.Type data type. -func findFieldOfType(reply reflect.Type, fieldName string) (reflect.StructField, bool) { - if reply.Kind() == reflect.Struct { - field, found := reply.FieldByName(fieldName) - return field, found - } else if reply.Kind() == reflect.Ptr && reply.Elem().Kind() == reflect.Struct { - field, found := reply.Elem().FieldByName(fieldName) - return field, found - } - return reflect.StructField{}, false -} - -// findFieldOfValue finds the field specified by its name in provided message defined as reflect.Value data type. -func findFieldOfValue(reply reflect.Value, fieldName string) (reflect.Value, bool) { - if reply.Kind() == reflect.Struct { - field := reply.FieldByName(fieldName) - return field, field.IsValid() - } else if reply.Kind() == reflect.Ptr && reply.Elem().Kind() == reflect.Struct { - field := reply.Elem().FieldByName(fieldName) - return field, field.IsValid() - } - return reflect.Value{}, false -} - -// HasSwIfIdx checks whether provided message has the swIfIndex field. -func HasSwIfIdx(msg reflect.Type) bool { - _, found := findFieldOfType(msg, swIfIndexName) - return found -} - -// SetSwIfIdx sets the swIfIndex field of provided message to provided value. -func SetSwIfIdx(msg reflect.Value, swIfIndex uint32) { - if field, found := findFieldOfValue(msg, swIfIndexName); found { - field.Set(reflect.ValueOf(swIfIndex)) - } -} - -// SetRetval sets the retval field of provided message to provided value. -func SetRetval(msg reflect.Value, retVal int32) { - if field, found := findFieldOfValue(msg, retvalName); found { - field.Set(reflect.ValueOf(retVal)) - } -} - -// ReplyNameFor returns reply message name to the given request message name. -func ReplyNameFor(requestName string) (string, bool) { - return requestName + replySuffix, true -} diff --git a/api/api.go b/api/api.go index fe6a34a..afaa8ad 100644 --- a/api/api.go +++ b/api/api.go @@ -32,7 +32,7 @@ const ( OtherMessage ) -// Message is an interface that is implemented by all VPP Binary API messages generated by the binapi-generator. +// Message is an interface that is implemented by all VPP Binary API messages generated by the binapigenerator. type Message interface { // GetMessageName returns the original VPP name of the message, as defined in the VPP API. GetMessageName() string @@ -44,7 +44,7 @@ type Message interface { GetCrcString() string } -// DataType is an interface that is implemented by all VPP Binary API data types by the binapi-generator. +// DataType is an interface that is implemented by all VPP Binary API data types by the binapi_generator. type DataType interface { // GetTypeName returns the original VPP name of the data type, as defined in the VPP API. GetTypeName() string diff --git a/core/core.go b/core/core.go index 5b14a42..2484c81 100644 --- a/core/core.go +++ b/core/core.go @@ -14,14 +14,12 @@ package core -//go:generate binapi-generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api - import ( "errors" - "fmt" "os" "sync" "sync/atomic" + "time" logger "github.com/Sirupsen/logrus" @@ -31,14 +29,41 @@ import ( ) const ( - requestChannelBufSize = 100 // default size of the request channel buffers - replyChannelBufSize = 100 // default size of the reply channel buffers + requestChannelBufSize = 100 // default size of the request channel buffers + replyChannelBufSize = 100 // default size of the reply channel buffers + notificationChannelBufSize = 100 // default size of the notification channel buffers +) + +const ( + healthCheckProbeInterval = time.Second * 1 // default health check probe interval + healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe +) + +// ConnectionState holds the current state of the connection to VPP. +type ConnectionState int + +const ( + // Connected connection state means that the connection to VPP has been successfully established. + Connected ConnectionState = iota + + // Disconnected connection state means that the connection to VPP has been lost. + Disconnected = iota ) +// ConnectionEvent is a notification about change in the VPP connection state. +type ConnectionEvent struct { + // Timestamp holds the time when the event has been generated. + Timestamp time.Time + + // State holds the new state of the connection to VPP at the time when the event has been generated. + State ConnectionState +} + // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { - vpp adapter.VppAdapter // VPP adapter - codec *MsgCodec // message codec + vpp adapter.VppAdapter // VPP adapter + connected uint32 // non-zero if the adapter is connected to VPP + codec *MsgCodec // message codec msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgIDsLock sync.RWMutex // lock for the message IDs map @@ -79,7 +104,57 @@ func SetLogger(l *logger.Logger) { } // Connect connects to VPP using specified VPP adapter and returns the connection handle. +// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, err + } + + // blocking attempt to connect to VPP + err = c.connectVPP() + if err != nil { + return nil, err + } + + return conn, nil +} + +// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle +// and ConnectionState channel. This call does not block until connection is established, it +// returns immediately. The caller is supposed to watch the returned ConnectionState channel for +// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. +func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, nil, err + } + + // asynchronously attempt to connect to VPP + connChan := make(chan ConnectionEvent, notificationChannelBufSize) + go c.connectLoop(connChan) + + return conn, connChan, nil +} + +// Disconnect disconnects from VPP and releases all connection-related resources. +func (c *Connection) Disconnect() { + if c == nil { + return + } + connLock.Lock() + defer connLock.Unlock() + + if c != nil && c.vpp != nil { + c.disconnectVPP() + } + conn = nil +} + +// newConnection returns new connection handle. +func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { connLock.Lock() defer connLock.Unlock() @@ -93,35 +168,102 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription) conn.vpp.SetMsgCallback(msgCallback) + return conn, nil +} - logger.Debug("Connecting to VPP...") +// connectVPP performs one blocking attempt to connect to VPP. +func (c *Connection) connectVPP() error { + log.Debug("Connecting to VPP...") - err := conn.vpp.Connect() + // blocking connect + err := c.vpp.Connect() if err != nil { - return nil, err + log.Warn(err) + return err } + // store connected state + atomic.StoreUint32(&c.connected, 1) + // store control ping IDs - conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{}) - conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{}) + c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{}) + c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{}) - logger.Debug("VPP connected.") + log.Info("Connected to VPP.") + return nil +} - return conn, nil +// disconnectVPP disconnects from VPP in case it is connected. +func (c *Connection) disconnectVPP() { + if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { + c.vpp.Disconnect() + } } -// Disconnect disconnects from VPP. -func (c *Connection) Disconnect() { - if c == nil { +// connectLoop attempts to connect to VPP until it succeeds. +// Then it continues with healthCheckLoop. +func (c *Connection) connectLoop(connChan chan ConnectionEvent) { + // loop until connected + for { + err := c.connectVPP() + if err == nil { + // signal connected event + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + break + } + } + + // we are now connected, continue with health check loop + c.healthCheckLoop(connChan) +} + +// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, +// it continues with connectLoop and tries to reconnect. +func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { + // create a separate API channel for health check probes + ch, err := conn.NewAPIChannel() + if err != nil { + log.Error("Error by creating health check API channel, health check will be disabled:", err) return } - connLock.Lock() - defer connLock.Unlock() - if c != nil && c.vpp != nil { - c.vpp.Disconnect() + // send health check probes until an error occurs + for { + // wait for healthCheckProbeInterval + <-time.After(healthCheckProbeInterval) + + if atomic.LoadUint32(&c.connected) == 0 { + // Disconnect has been called in the meantime, return the healthcheck - reconnect loop + log.Debug("Disconnected on request, exiting health check loop.") + return + } + + // send the control ping + ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} + + // expect response within timeout period + select { + case vppReply := <-ch.ReplyChan: + err = vppReply.Error + case <-time.After(healthCheckReplyTimeout): + err = errors.New("probe reply not received within the timeout period") + } + + // in case of error, break & disconnect + if err != nil { + log.Errorf("VPP health check failed: %v", err) + // signal disconnected event via channel + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + break + } } - conn = nil + + // cleanup + ch.Close() + c.disconnectVPP() + + // we are now disconnected, start connect loop + c.connectLoop(connChan) } // NewAPIChannel returns a new API channel for communication with VPP via govpp core. @@ -163,84 +305,6 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) return ch, nil } -// watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { - for { - select { - case req, ok := <-ch.ReqChan: - // new request on the request channel - if !ok { - // after closing the request channel, release API channel and return - c.releaseAPIChannel(ch, chMeta) - return - } - c.processRequest(ch, chMeta, req) - - case req := <-ch.NotifSubsChan: - // new request on the notification subscribe channel - c.processNotifSubscribeRequest(ch, req) - } - } -} - -// processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { - // retrieve message ID - msgID, err := c.GetMessageID(req.Message) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": req.Message.GetMessageName(), - "msg_crc": req.Message.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // encode the message into binary - data, err := c.codec.EncodeMsg(req.Message, msgID) - if err != nil { - error := fmt.Errorf("unable to encode the messge: %v", err) - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - }).Errorf("%v", error) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // send the message - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Sending a message to VPP.") - - if req.Multipart { - // expect multipart response - atomic.StoreUint32(&chMeta.multipart, 1) - } - - // send the request to VPP - c.vpp.SendMsg(chMeta.id, data) - - if req.Multipart { - // send a control ping to determine end of the multipart response - ping := &vpe.ControlPing{} - pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID) - - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": c.pingReqID, - "msg_size": len(pingData), - }).Debug("Sending a control ping to VPP.") - - c.vpp.SendMsg(chMeta.id, pingData) - } - - return nil -} - // releaseAPIChannel releases API channel that needs to be closed. func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) { log.WithFields(logger.Fields{ @@ -252,104 +316,3 @@ func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) delete(c.channels, chMeta.id) c.channelsLock.Unlock() } - -// msgCallback is called whenever any binary API message comes from VPP. -func msgCallback(context uint32, msgID uint16, data []byte) { - connLock.RLock() - defer connLock.RUnlock() - - if conn == nil { - log.Warn("Already disconnected, ignoring the message.") - return - } - - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Received a message from VPP.") - - if context == 0 || conn.isNotificationMessage(msgID) { - // process the message as a notification - conn.sendNotifications(msgID, data) - return - } - - // match ch according to the context - conn.channelsLock.RLock() - ch, ok := conn.channels[context] - conn.channelsLock.RUnlock() - - if !ok { - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - }).Error("Context ID not known, ignoring the message.") - return - } - - chMeta := ch.Metadata().(*channelMetadata) - lastReplyReceived := false - // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply - if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { - lastReplyReceived = true - } - - // send the data to the channel - sendReply(ch, &api.VppReply{ - MessageID: msgID, - Data: data, - LastReplyReceived: lastReplyReceived, - }) -} - -// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise -// it logs the error and do not send the message. -func sendReply(ch *api.Channel, reply *api.VppReply) { - select { - case ch.ReplyChan <- reply: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch, - "msg_id": reply.MessageID, - }).Warn("Unable to send the reply, reciever end not ready.") - } -} - -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString()) -} - -// messageNameToID returns message ID of a message identified by its name and CRC. -func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) { - // try to get the ID from the map - c.msgIDsLock.RLock() - id, ok := c.msgIDs[msgName+msgCrc] - c.msgIDsLock.RUnlock() - if ok { - return id, nil - } - - // get the ID using VPP API - id, err := c.vpp.GetMsgID(msgName, msgCrc) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_crc": msgCrc, - }).Errorf("unable to retrieve message ID: %v", err) - return id, error - } - - c.msgIDsLock.Lock() - c.msgIDs[msgName+msgCrc] = id - c.msgIDsLock.Unlock() - - return id, nil -} diff --git a/core/core_test.go b/core/core_test.go index d3c2e2c..3184ef5 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -145,6 +145,7 @@ func TestNotifications(t *testing.T) { } func TestNilConnection(t *testing.T) { + RegisterTestingT(t) var conn *Connection ch, err := conn.NewAPIChannel() @@ -168,6 +169,21 @@ func TestDoubleConnection(t *testing.T) { Expect(conn).Should(BeNil()) } +func TestAsyncConnection(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.conn.Disconnect() + conn, ch, err := AsyncConnect(ctx.mockVpp) + ctx.conn = conn + + Expect(err).ShouldNot(HaveOccurred()) + Expect(conn).ShouldNot(BeNil()) + + ev := <-ch + Expect(ev.State).Should(BeEquivalentTo(Connected)) +} + func TestFullBuffer(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() diff --git a/core/notification_handler.go b/core/notification_handler.go new file mode 100644 index 0000000..5cd132c --- /dev/null +++ b/core/notification_handler.go @@ -0,0 +1,182 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "fmt" + "reflect" + + logger "github.com/Sirupsen/logrus" + + "git.fd.io/govpp.git/api" +) + +// processNotifSubscribeRequest processes a notification subscribe request. +func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error { + var err error + + // subscribe / unsubscribe + if req.Subscribe { + err = c.addNotifSubscription(req.Subscription) + } else { + err = c.removeNotifSubscription(req.Subscription) + } + + // send the reply into the go channel + select { + case ch.NotifSubsReplyChan <- err: + // reply sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "channel": ch, + }).Warn("Unable to deliver the subscribe reply, reciever end not ready.") + } + + return err +} + +// addNotifSubscription adds the notification subscription into the subscriptions map of the connection. +func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { + // get message ID of the notification message + msgID, err := c.getSubscriptionMessageID(subs) + if err != nil { + return err + } + + log.WithFields(logger.Fields{ + "msg_id": msgID, + "subscription": subs, + }).Debug("Adding new notification subscription.") + + // add the subscription into map + c.notifSubscriptionsLock.Lock() + defer c.notifSubscriptionsLock.Unlock() + + c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs) + + return nil +} + +// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection. +func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error { + // get message ID of the notification message + msgID, err := c.getSubscriptionMessageID(subs) + if err != nil { + return err + } + + log.WithFields(logger.Fields{ + "msg_id": msgID, + "subscription": subs, + }).Debug("Removing notification subscription.") + + // remove the subscription from the map + c.notifSubscriptionsLock.Lock() + defer c.notifSubscriptionsLock.Unlock() + + for i, item := range c.notifSubscriptions[msgID] { + if item == subs { + // remove i-th item in the slice + c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...) + break + } + } + + return nil +} + +// isNotificationMessage returns true if someone has subscribed to provided message ID. +func (c *Connection) isNotificationMessage(msgID uint16) bool { + c.notifSubscriptionsLock.RLock() + defer c.notifSubscriptionsLock.RUnlock() + + _, exists := c.notifSubscriptions[msgID] + return exists +} + +// sendNotifications send a notification message to all subscribers subscribed for that message. +func (c *Connection) sendNotifications(msgID uint16, data []byte) { + c.notifSubscriptionsLock.RLock() + defer c.notifSubscriptionsLock.RUnlock() + + matched := false + + // send to notification to each subscriber + for _, subs := range c.notifSubscriptions[msgID] { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + "subscription": subs, + }).Debug("Sending a notification to the subscription channel.") + + msg := subs.MsgFactory() + err := c.codec.DecodeMsg(data, msg) + if err != nil { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + "subscription": subs, + }).Error("Unable to decode the notification message.") + continue + } + + // special case for the strange interface counters message + if msg.GetMessageName() == "vnet_interface_counters" { + v := reflect.ValueOf(msg).Elem().FieldByName("Data") + if v.IsValid() { + v.SetBytes(data[8:]) // include the Count and Data fields in the data + } + } + + // send the message into the go channel of the subscription + select { + case subs.NotifChan <- msg: + // message sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + "subscription": subs, + }).Warn("Unable to deliver the notification, reciever end not ready.") + } + + matched = true + } + + if !matched { + log.WithFields(logger.Fields{ + "msg_id": msgID, + "msg_size": len(data), + }).Debug("No subscription found for the notification message.") + } +} + +// getSubscriptionMessageID returns ID of the message the subscription is tied to. +func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) { + msg := subs.MsgFactory() + msgID, err := c.GetMessageID(msg) + + if err != nil { + log.WithFields(logger.Fields{ + "msg_name": msg.GetMessageName(), + "msg_crc": msg.GetCrcString(), + }).Errorf("unable to retrieve message ID: %v", err) + return 0, fmt.Errorf("unable to retrieve message ID: %v", err) + } + + return msgID, nil +} diff --git a/core/notifications.go b/core/notifications.go deleted file mode 100644 index 5cd132c..0000000 --- a/core/notifications.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "fmt" - "reflect" - - logger "github.com/Sirupsen/logrus" - - "git.fd.io/govpp.git/api" -) - -// processNotifSubscribeRequest processes a notification subscribe request. -func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error { - var err error - - // subscribe / unsubscribe - if req.Subscribe { - err = c.addNotifSubscription(req.Subscription) - } else { - err = c.removeNotifSubscription(req.Subscription) - } - - // send the reply into the go channel - select { - case ch.NotifSubsReplyChan <- err: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch, - }).Warn("Unable to deliver the subscribe reply, reciever end not ready.") - } - - return err -} - -// addNotifSubscription adds the notification subscription into the subscriptions map of the connection. -func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_id": msgID, - "subscription": subs, - }).Debug("Adding new notification subscription.") - - // add the subscription into map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs) - - return nil -} - -// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection. -func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error { - // get message ID of the notification message - msgID, err := c.getSubscriptionMessageID(subs) - if err != nil { - return err - } - - log.WithFields(logger.Fields{ - "msg_id": msgID, - "subscription": subs, - }).Debug("Removing notification subscription.") - - // remove the subscription from the map - c.notifSubscriptionsLock.Lock() - defer c.notifSubscriptionsLock.Unlock() - - for i, item := range c.notifSubscriptions[msgID] { - if item == subs { - // remove i-th item in the slice - c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...) - break - } - } - - return nil -} - -// isNotificationMessage returns true if someone has subscribed to provided message ID. -func (c *Connection) isNotificationMessage(msgID uint16) bool { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - _, exists := c.notifSubscriptions[msgID] - return exists -} - -// sendNotifications send a notification message to all subscribers subscribed for that message. -func (c *Connection) sendNotifications(msgID uint16, data []byte) { - c.notifSubscriptionsLock.RLock() - defer c.notifSubscriptionsLock.RUnlock() - - matched := false - - // send to notification to each subscriber - for _, subs := range c.notifSubscriptions[msgID] { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Debug("Sending a notification to the subscription channel.") - - msg := subs.MsgFactory() - err := c.codec.DecodeMsg(data, msg) - if err != nil { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Error("Unable to decode the notification message.") - continue - } - - // special case for the strange interface counters message - if msg.GetMessageName() == "vnet_interface_counters" { - v := reflect.ValueOf(msg).Elem().FieldByName("Data") - if v.IsValid() { - v.SetBytes(data[8:]) // include the Count and Data fields in the data - } - } - - // send the message into the go channel of the subscription - select { - case subs.NotifChan <- msg: - // message sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - "subscription": subs, - }).Warn("Unable to deliver the notification, reciever end not ready.") - } - - matched = true - } - - if !matched { - log.WithFields(logger.Fields{ - "msg_id": msgID, - "msg_size": len(data), - }).Debug("No subscription found for the notification message.") - } -} - -// getSubscriptionMessageID returns ID of the message the subscription is tied to. -func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, error) { - msg := subs.MsgFactory() - msgID, err := c.GetMessageID(msg) - - if err != nil { - log.WithFields(logger.Fields{ - "msg_name": msg.GetMessageName(), - "msg_crc": msg.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - return 0, fmt.Errorf("unable to retrieve message ID: %v", err) - } - - return msgID, nil -} diff --git a/core/request_handler.go b/core/request_handler.go new file mode 100644 index 0000000..f4f5e92 --- /dev/null +++ b/core/request_handler.go @@ -0,0 +1,213 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "errors" + "fmt" + "sync/atomic" + + logger "github.com/Sirupsen/logrus" + + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/core/bin_api/vpe" +) + +// watchRequests watches for requests on the request API channel and forwards them as messages to VPP. +func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { + for { + select { + case req, ok := <-ch.ReqChan: + // new request on the request channel + if !ok { + // after closing the request channel, release API channel and return + c.releaseAPIChannel(ch, chMeta) + return + } + c.processRequest(ch, chMeta, req) + + case req := <-ch.NotifSubsChan: + // new request on the notification subscribe channel + c.processNotifSubscribeRequest(ch, req) + } + } +} + +// processRequest processes a single request received on the request channel. +func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { + // check whether we are connected to VPP + if atomic.LoadUint32(&c.connected) == 0 { + error := errors.New("not connected to VPP, ignoring the request") + log.Error(error) + sendReply(ch, &api.VppReply{Error: error}) + return error + } + + // retrieve message ID + msgID, err := c.GetMessageID(req.Message) + if err != nil { + error := fmt.Errorf("unable to retrieve message ID: %v", err) + log.WithFields(logger.Fields{ + "msg_name": req.Message.GetMessageName(), + "msg_crc": req.Message.GetCrcString(), + }).Error(err) + sendReply(ch, &api.VppReply{Error: error}) + return error + } + + // encode the message into binary + data, err := c.codec.EncodeMsg(req.Message, msgID) + if err != nil { + error := fmt.Errorf("unable to encode the messge: %v", err) + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": msgID, + }).Error(error) + sendReply(ch, &api.VppReply{Error: error}) + return error + } + + // send the message + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Sending a message to VPP.") + + if req.Multipart { + // expect multipart response + atomic.StoreUint32(&chMeta.multipart, 1) + } + + // send the request to VPP + c.vpp.SendMsg(chMeta.id, data) + + if req.Multipart { + // send a control ping to determine end of the multipart response + ping := &vpe.ControlPing{} + pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID) + + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": c.pingReqID, + "msg_size": len(pingData), + }).Debug("Sending a control ping to VPP.") + + c.vpp.SendMsg(chMeta.id, pingData) + } + + return nil +} + +// msgCallback is called whenever any binary API message comes from VPP. +func msgCallback(context uint32, msgID uint16, data []byte) { + connLock.RLock() + defer connLock.RUnlock() + + if conn == nil { + log.Warn("Already disconnected, ignoring the message.") + return + } + + log.WithFields(logger.Fields{ + "context": context, + "msg_id": msgID, + "msg_size": len(data), + }).Debug("Received a message from VPP.") + + if context == 0 || conn.isNotificationMessage(msgID) { + // process the message as a notification + conn.sendNotifications(msgID, data) + return + } + + // match ch according to the context + conn.channelsLock.RLock() + ch, ok := conn.channels[context] + conn.channelsLock.RUnlock() + + if !ok { + log.WithFields(logger.Fields{ + "context": context, + "msg_id": msgID, + }).Error("Context ID not known, ignoring the message.") + return + } + + chMeta := ch.Metadata().(*channelMetadata) + lastReplyReceived := false + // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply + if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { + lastReplyReceived = true + } + + // send the data to the channel + sendReply(ch, &api.VppReply{ + MessageID: msgID, + Data: data, + LastReplyReceived: lastReplyReceived, + }) +} + +// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise +// it logs the error and do not send the message. +func sendReply(ch *api.Channel, reply *api.VppReply) { + select { + case ch.ReplyChan <- reply: + // reply sent successfully + default: + // unable to write into the channel without blocking + log.WithFields(logger.Fields{ + "channel": ch, + "msg_id": reply.MessageID, + }).Warn("Unable to send the reply, reciever end not ready.") + } +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { + if c == nil { + return 0, errors.New("nil connection passed in") + } + return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString()) +} + +// messageNameToID returns message ID of a message identified by its name and CRC. +func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) { + // try to get the ID from the map + c.msgIDsLock.RLock() + id, ok := c.msgIDs[msgName+msgCrc] + c.msgIDsLock.RUnlock() + if ok { + return id, nil + } + + // get the ID using VPP API + id, err := c.vpp.GetMsgID(msgName, msgCrc) + if err != nil { + error := fmt.Errorf("unable to retrieve message ID: %v", err) + log.WithFields(logger.Fields{ + "msg_name": msgName, + "msg_crc": msgCrc, + }).Errorf("unable to retrieve message ID: %v", err) + return id, error + } + + c.msgIDsLock.Lock() + c.msgIDs[msgName+msgCrc] = id + c.msgIDsLock.Unlock() + + return id, nil +} diff --git a/examples/cmd/simple-client/simple_client.go b/examples/cmd/simple-client/simple_client.go new file mode 100644 index 0000000..6e46d6b --- /dev/null +++ b/examples/cmd/simple-client/simple_client.go @@ -0,0 +1,221 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Binary simple-client is an example VPP management application that exercises the +// govpp API on real-world use-cases. +package main + +// Generates Go bindings for all VPP APIs located in the json directory. +//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api + +import ( + "fmt" + "net" + "os" + + "git.fd.io/govpp.git" + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/examples/bin_api/acl" + "git.fd.io/govpp.git/examples/bin_api/interfaces" + "git.fd.io/govpp.git/examples/bin_api/tap" +) + +func main() { + fmt.Println("Starting simple VPP client...") + + // connect to VPP + conn, err := govpp.Connect() + if err != nil { + fmt.Println("Error:", err) + os.Exit(1) + } + defer conn.Disconnect() + + // create an API channel that will be used in the examples + ch, err := conn.NewAPIChannel() + if err != nil { + fmt.Println("Error:", err) + os.Exit(1) + } + defer ch.Close() + + // check whether the VPP supports our version of some messages + compatibilityCheck(ch) + + // individual examples + aclVersion(ch) + aclConfig(ch) + aclDump(ch) + + tapConnect(ch) + + interfaceDump(ch) + interfaceNotifications(ch) +} + +// compatibilityCheck shows how an management application can check whether generated API messages are +// compatible with the version of VPP which the library is connected to. +func compatibilityCheck(ch *api.Channel) { + err := ch.CheckMessageCompatibility( + &interfaces.SwInterfaceDump{}, + &interfaces.SwInterfaceDetails{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +// aclVersion is the simplest API example - one empty request message and one reply message. +func aclVersion(ch *api.Channel) { + req := &acl.ACLPluginGetVersion{} + reply := &acl.ACLPluginGetVersionReply{} + + err := ch.SendRequest(req).ReceiveReply(reply) + + if err != nil { + fmt.Println("Error:", err) + } else { + fmt.Printf("%+v\n", reply) + } +} + +// aclConfig is another simple API example - in this case, the request contains structured data. +func aclConfig(ch *api.Channel) { + req := &acl.ACLAddReplace{ + ACLIndex: ^uint32(0), + Tag: []byte("access list 1"), + R: []acl.ACLRule{ + { + IsPermit: 1, + SrcIPAddr: net.ParseIP("10.0.0.0").To4(), + SrcIPPrefixLen: 8, + DstIPAddr: net.ParseIP("192.168.1.0").To4(), + DstIPPrefixLen: 24, + Proto: 6, + }, + { + IsPermit: 1, + SrcIPAddr: net.ParseIP("8.8.8.8").To4(), + SrcIPPrefixLen: 32, + DstIPAddr: net.ParseIP("172.16.0.0").To4(), + DstIPPrefixLen: 16, + Proto: 6, + }, + }, + } + reply := &acl.ACLAddReplaceReply{} + + err := ch.SendRequest(req).ReceiveReply(reply) + + if err != nil { + fmt.Println("Error:", err) + } else { + fmt.Printf("%+v\n", reply) + } +} + +// aclDump shows an example where SendRequest and ReceiveReply are not chained together. +func aclDump(ch *api.Channel) { + req := &acl.ACLDump{} + reply := &acl.ACLDetails{} + + reqCtx := ch.SendRequest(req) + err := reqCtx.ReceiveReply(reply) + + if err != nil { + fmt.Println("Error:", err) + } else { + fmt.Printf("%+v\n", reply) + } +} + +// tapConnect example shows how the Go channels in the API channel can be accessed directly instead +// of using SendRequest and ReceiveReply wrappers. +func tapConnect(ch *api.Channel) { + req := &tap.TapConnect{ + TapName: []byte("testtap"), + UseRandomMac: 1, + } + + // send the request to the request go channel + ch.ReqChan <- &api.VppRequest{Message: req} + + // receive a reply from the reply go channel + vppReply := <-ch.ReplyChan + if vppReply.Error != nil { + fmt.Println("Error:", vppReply.Error) + return + } + + // decode the message + reply := &tap.TapConnectReply{} + err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) + + if err != nil { + fmt.Println("Error:", err) + } else { + fmt.Printf("%+v\n", reply) + } +} + +// interfaceDump shows an example of multipart request (multiple replies are expected). +func interfaceDump(ch *api.Channel) { + req := &interfaces.SwInterfaceDump{} + reqCtx := ch.SendMultiRequest(req) + + for { + msg := &interfaces.SwInterfaceDetails{} + stop, err := reqCtx.ReceiveReply(msg) + if stop { + break // break out of the loop + } + if err != nil { + fmt.Println("Error:", err) + } + fmt.Printf("%+v\n", msg) + } +} + +// interfaceNotifications shows the usage of notification API. Note that for notifications, +// you are supposed to create your own Go channel with your preferred buffer size. If the channel's +// buffer is full, the notifications will not be delivered into it. +func interfaceNotifications(ch *api.Channel) { + // subscribe for specific notification message + notifChan := make(chan api.Message, 100) + subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) + + // enable interface events in VPP + ch.SendRequest(&interfaces.WantInterfaceEvents{ + Pid: uint32(os.Getpid()), + EnableDisable: 1, + }).ReceiveReply(&interfaces.WantInterfaceEventsReply{}) + + // generate some events in VPP + ch.SendRequest(&interfaces.SwInterfaceSetFlags{ + SwIfIndex: 0, + AdminUpDown: 0, + }).ReceiveReply(&interfaces.SwInterfaceSetFlagsReply{}) + ch.SendRequest(&interfaces.SwInterfaceSetFlags{ + SwIfIndex: 0, + AdminUpDown: 1, + }).ReceiveReply(&interfaces.SwInterfaceSetFlagsReply{}) + + // receive one notification + notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) + fmt.Printf("%+v\n", notif) + + // unsubscribe from delivery of the notifications + ch.UnsubscribeNotification(subs) +} diff --git a/examples/cmd/stats-client/stats_client.go b/examples/cmd/stats-client/stats_client.go new file mode 100644 index 0000000..fc40b24 --- /dev/null +++ b/examples/cmd/stats-client/stats_client.go @@ -0,0 +1,132 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Binary stats-client is an example VPP management application that exercises the +// govpp API for interface counters together with asynchronous connection to VPP. +package main + +// Generates Go bindings for all VPP APIs located in the json directory. +//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api + +import ( + "fmt" + "os" + "os/signal" + + "git.fd.io/govpp.git" + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/api/ifcounters" + "git.fd.io/govpp.git/core" + "git.fd.io/govpp.git/core/bin_api/vpe" + "git.fd.io/govpp.git/examples/bin_api/interfaces" +) + +func main() { + fmt.Println("Starting stats VPP client...") + + // async connect to VPP + conn, statCh, err := govpp.AsyncConnect() + if err != nil { + fmt.Println("Error:", err) + os.Exit(1) + } + defer conn.Disconnect() + + // create an API channel that will be used in the examples + ch, err := conn.NewAPIChannel() + if err != nil { + fmt.Println("Error:", err) + os.Exit(1) + } + defer ch.Close() + + // create channel for Interrupt signal + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + + var subs *api.NotifSubscription + var notifChan chan api.Message + + // loop until Interrupt signal is received +loop: + for { + select { + + case connEvent := <-statCh: + // VPP connection state change + switch connEvent.State { + case core.Connected: + fmt.Println("VPP connected.") + if subs == nil { + subs, notifChan = subscribeNotification(ch) + } + requestStatistics(ch) + + case core.Disconnected: + fmt.Println("VPP disconnected.") + } + + case notifMsg := <-notifChan: + // counter notification received + processCounters(notifMsg.(*interfaces.VnetInterfaceCounters)) + + case <-sigChan: + // interrupt received + fmt.Println("Interrupt received, exiting.") + break loop + } + } + + ch.UnsubscribeNotification(subs) +} + +// subscribeNotification subscribes for interface counters notifications. +func subscribeNotification(ch *api.Channel) (*api.NotifSubscription, chan api.Message) { + + notifChan := make(chan api.Message, 100) + subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceCounters) + + return subs, notifChan +} + +// requestStatistics requests interface counters notifications from VPP. +func requestStatistics(ch *api.Channel) { + ch.SendRequest(&vpe.WantStats{ + Pid: uint32(os.Getpid()), + EnableDisable: 1, + }).ReceiveReply(&vpe.WantStatsReply{}) +} + +// processCounters processes a counter message received from VPP. +func processCounters(msg *interfaces.VnetInterfaceCounters) { + fmt.Printf("%+v\n", msg) + + if msg.IsCombined == 0 { + // simple counter + counters, err := ifcounters.DecodeCounters(ifcounters.VnetInterfaceCounters(*msg)) + if err != nil { + fmt.Println("Error:", err) + } else { + fmt.Printf("%+v\n", counters) + } + } else { + // combined counter + counters, err := ifcounters.DecodeCombinedCounters(ifcounters.VnetInterfaceCounters(*msg)) + if err != nil { + fmt.Println("Error:", err) + } else { + fmt.Printf("%+v\n", counters) + } + } +} diff --git a/examples/example_client.go b/examples/example_client.go deleted file mode 100644 index f2e5804..0000000 --- a/examples/example_client.go +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Example VPP management application that exercises the govpp API on real-world use-cases. -package main - -// Generates Go bindings for all VPP APIs located in the json directory. -//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api - -import ( - "fmt" - "net" - "os" - "os/signal" - - "git.fd.io/govpp.git" - "git.fd.io/govpp.git/api" - "git.fd.io/govpp.git/api/ifcounters" - "git.fd.io/govpp.git/core/bin_api/vpe" - "git.fd.io/govpp.git/examples/bin_api/acl" - "git.fd.io/govpp.git/examples/bin_api/interfaces" - "git.fd.io/govpp.git/examples/bin_api/tap" -) - -func main() { - fmt.Println("Starting example VPP client...") - - // connect to VPP - conn, err := govpp.Connect() - if err != nil { - fmt.Println("Error:", err) - os.Exit(1) - } - defer conn.Disconnect() - - // create an API channel that will be used in the examples - ch, err := conn.NewAPIChannel() - if err != nil { - fmt.Println("Error:", err) - os.Exit(1) - } - defer ch.Close() - - // check whether the VPP supports our version of some messages - compatibilityCheck(ch) - - // individual examples - aclVersion(ch) - aclConfig(ch) - aclDump(ch) - - tapConnect(ch) - - interfaceDump(ch) - interfaceNotifications(ch) - - //interfaceCounters(ch) -} - -// compatibilityCheck shows how an management application can check whether generated API messages are -// compatible with the version of VPP which the library is connected to. -func compatibilityCheck(ch *api.Channel) { - err := ch.CheckMessageCompatibility( - &interfaces.SwInterfaceDump{}, - &interfaces.SwInterfaceDetails{}, - ) - if err != nil { - fmt.Println(err) - os.Exit(1) - } -} - -// aclVersion is the simplest API example - one empty request message and one reply message. -func aclVersion(ch *api.Channel) { - req := &acl.ACLPluginGetVersion{} - reply := &acl.ACLPluginGetVersionReply{} - - err := ch.SendRequest(req).ReceiveReply(reply) - - if err != nil { - fmt.Println("Error:", err) - } else { - fmt.Printf("%+v\n", reply) - } -} - -// aclConfig is another simple API example - in this case, the request contains structured data. -func aclConfig(ch *api.Channel) { - req := &acl.ACLAddReplace{ - ACLIndex: ^uint32(0), - Tag: []byte("access list 1"), - R: []acl.ACLRule{ - { - IsPermit: 1, - SrcIPAddr: net.ParseIP("10.0.0.0").To4(), - SrcIPPrefixLen: 8, - DstIPAddr: net.ParseIP("192.168.1.0").To4(), - DstIPPrefixLen: 24, - Proto: 6, - }, - { - IsPermit: 1, - SrcIPAddr: net.ParseIP("8.8.8.8").To4(), - SrcIPPrefixLen: 32, - DstIPAddr: net.ParseIP("172.16.0.0").To4(), - DstIPPrefixLen: 16, - Proto: 6, - }, - }, - } - reply := &acl.ACLAddReplaceReply{} - - err := ch.SendRequest(req).ReceiveReply(reply) - - if err != nil { - fmt.Println("Error:", err) - } else { - fmt.Printf("%+v\n", reply) - } -} - -// aclDump shows an example where SendRequest and ReceiveReply are not chained together. -func aclDump(ch *api.Channel) { - req := &acl.ACLDump{} - reply := &acl.ACLDetails{} - - reqCtx := ch.SendRequest(req) - err := reqCtx.ReceiveReply(reply) - - if err != nil { - fmt.Println("Error:", err) - } else { - fmt.Printf("%+v\n", reply) - } -} - -// tapConnect example shows how the Go channels in the API channel can be accessed directly instead -// of using SendRequest and ReceiveReply wrappers. -func tapConnect(ch *api.Channel) { - req := &tap.TapConnect{ - TapName: []byte("testtap"), - UseRandomMac: 1, - } - - // send the request to the request go channel - ch.ReqChan <- &api.VppRequest{Message: req} - - // receive a reply from the reply go channel - vppReply := <-ch.ReplyChan - if vppReply.Error != nil { - fmt.Println("Error:", vppReply.Error) - return - } - - // decode the message - reply := &tap.TapConnectReply{} - err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) - - if err != nil { - fmt.Println("Error:", err) - } else { - fmt.Printf("%+v\n", reply) - } -} - -// interfaceDump shows an example of multipart request (multiple replies are expected). -func interfaceDump(ch *api.Channel) { - req := &interfaces.SwInterfaceDump{} - reqCtx := ch.SendMultiRequest(req) - - for { - msg := &interfaces.SwInterfaceDetails{} - stop, err := reqCtx.ReceiveReply(msg) - if stop { - break // break out of the loop - } - if err != nil { - fmt.Println("Error:", err) - } - fmt.Printf("%+v\n", msg) - } -} - -// interfaceNotifications shows the usage of notification API. Note that for notifications, -// you are supposed to create your own Go channel with your preferred buffer size. If the channel's -// buffer is full, the notifications will not be delivered into it. -func interfaceNotifications(ch *api.Channel) { - // subscribe for specific notification message - notifChan := make(chan api.Message, 100) - subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) - - // enable interface events in VPP - ch.SendRequest(&interfaces.WantInterfaceEvents{ - Pid: uint32(os.Getpid()), - EnableDisable: 1, - }).ReceiveReply(&interfaces.WantInterfaceEventsReply{}) - - // generate some events in VPP - ch.SendRequest(&interfaces.SwInterfaceSetFlags{ - SwIfIndex: 0, - AdminUpDown: 0, - }).ReceiveReply(&interfaces.SwInterfaceSetFlagsReply{}) - ch.SendRequest(&interfaces.SwInterfaceSetFlags{ - SwIfIndex: 0, - AdminUpDown: 1, - }).ReceiveReply(&interfaces.SwInterfaceSetFlagsReply{}) - - // receive one notification - notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) - fmt.Printf("%+v\n", notif) - - // unsubscribe from delivery of the notifications - ch.UnsubscribeNotification(subs) -} - -// interfaceCounters is an example of using notification API to periodically retrieve interface statistics. -// The ifcounters package contains the API that can be used to decode the strange VnetInterfaceCounters message. -func interfaceCounters(ch *api.Channel) { - // subscribe for interface counters notifications - notifChan := make(chan api.Message, 100) - subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceCounters) - - // enable interface counters notifications from VPP - ch.SendRequest(&vpe.WantStats{ - Pid: uint32(os.Getpid()), - EnableDisable: 1, - }).ReceiveReply(&vpe.WantStatsReply{}) - - // create channel for Interrupt signal - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt) - - // loop until Interrupt signal is received -loop: - for { - select { - case <-sigChan: - // interrupt received - break loop - case notifMsg := <-notifChan: - notif := notifMsg.(*interfaces.VnetInterfaceCounters) - // notification received - fmt.Printf("%+v\n", notif) - - if notif.IsCombined == 0 { - // simple counter - counters, err := ifcounters.DecodeCounters(ifcounters.VnetInterfaceCounters(*notif)) - if err != nil { - fmt.Println("Error:", err) - } else { - fmt.Printf("%+v\n", counters) - } - } else { - // combined counter - counters, err := ifcounters.DecodeCombinedCounters(ifcounters.VnetInterfaceCounters(*notif)) - if err != nil { - fmt.Println("Error:", err) - } else { - fmt.Printf("%+v\n", counters) - } - } - } - } - - // unsubscribe from delivery of the notifications - ch.UnsubscribeNotification(subs) -} diff --git a/govpp.go b/govpp.go index 5d3ed7f..6f0cc2e 100644 --- a/govpp.go +++ b/govpp.go @@ -24,6 +24,7 @@ var vppAdapter adapter.VppAdapter // VPP Adapter that will be used in the subseq // Connect connects the govpp core to VPP either using the default VPP Adapter, or using the adapter previously // set by SetAdapter (useful mostly just for unit/integration tests with mocked VPP adapter). +// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect() (*core.Connection, error) { if vppAdapter == nil { vppAdapter = vppapiclient.NewVppAdapter() @@ -31,6 +32,18 @@ func Connect() (*core.Connection, error) { return core.Connect(vppAdapter) } +// AsyncConnect asynchronously connects the govpp core to VPP either using the default VPP Adapter, +// or using the adapter previously set by SetAdapter. +// This call does not block until connection is established, it returns immediately. The caller is +// supposed to watch the returned ConnectionState channel for Connected/Disconnected events. +// In case of disconnect, the library will asynchronously try to reconnect. +func AsyncConnect() (*core.Connection, chan core.ConnectionEvent, error) { + if vppAdapter == nil { + vppAdapter = vppapiclient.NewVppAdapter() + } + return core.AsyncConnect(vppAdapter) +} + // SetAdapter sets the adapter that will be used for connections to VPP in the subsequent `Connect` calls. func SetAdapter(ad adapter.VppAdapter) { vppAdapter = ad -- cgit 1.2.3-korg