aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOndrej Fabry <ofabry@cisco.com>2018-08-23 22:51:56 +0200
committerOndrej Fabry <ofabry@cisco.com>2018-08-24 12:43:05 +0200
commit6b350c65fe0ec845cecf58bfb41ffc63dc9c04f7 (patch)
tree6255495854f43ec2f2d11f88990369aadb48db3f
parent892683bef86cacc2ccda2b4df2b079171bd92164 (diff)
Simplify subscribing to events and fix events
- there is no need for sending subscription requests through channels, since all the messages are registered and no communication with VPP is needed Change-Id: Ibc29957be02a32e26309f66c369a071559b822a9 Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
-rw-r--r--Makefile1
-rw-r--r--adapter/adapter.go2
-rw-r--r--adapter/mock/mock_adapter.go11
-rw-r--r--adapter/vppapiclient/vppapiclient_adapter.go9
-rw-r--r--api/api.go37
-rw-r--r--cmd/binapi-generator/generate.go3
-rw-r--r--codec/msg_codec.go16
-rw-r--r--core/channel.go218
-rw-r--r--core/channel_test.go173
-rw-r--r--core/connection.go235
-rw-r--r--core/connection_test.go212
-rw-r--r--core/notification_handler.go170
-rw-r--r--core/request_handler.go81
-rw-r--r--examples/bin_api/acl/acl.ba.go1
-rw-r--r--examples/bin_api/af_packet/af_packet.ba.go1
-rw-r--r--examples/bin_api/interfaces/interfaces.ba.go1
-rw-r--r--examples/bin_api/ip/ip.ba.go1
-rw-r--r--examples/bin_api/memif/memif.ba.go1
-rw-r--r--examples/bin_api/stats/stats.ba.go1
-rw-r--r--examples/bin_api/tap/tap.ba.go1
-rw-r--r--examples/bin_api/vpe/vpe.ba.go1
-rw-r--r--examples/cmd/perf-bench/perf-bench.go16
-rw-r--r--examples/cmd/simple-client/simple_client.go70
-rw-r--r--examples/cmd/stats-client/stats_client.go31
24 files changed, 528 insertions, 765 deletions
diff --git a/Makefile b/Makefile
index ee06818..ea1daf6 100644
--- a/Makefile
+++ b/Makefile
@@ -26,7 +26,6 @@ clean:
@rm -f extras/libmemif/examples/icmp-responder/icmp-responder
generate:
- @cd core && go generate ./...
@cd examples && go generate ./...
lint:
diff --git a/adapter/adapter.go b/adapter/adapter.go
index 7d3d1e4..aa34329 100644
--- a/adapter/adapter.go
+++ b/adapter/adapter.go
@@ -22,7 +22,7 @@ import (
var ErrNotImplemented = errors.New("not implemented for this OS")
// MsgCallback defines func signature for message callback.
-type MsgCallback func(msgID uint16, context uint32, data []byte)
+type MsgCallback func(msgID uint16, data []byte)
// VppAdapter provides connection to VPP. It is responsible for sending and receiving of binary-encoded messages to/from VPP.
type VppAdapter interface {
diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go
index 5ca190f..cdf2081 100644
--- a/adapter/mock/mock_adapter.go
+++ b/adapter/mock/mock_adapter.go
@@ -92,9 +92,9 @@ const (
// NewVppAdapter returns a new mock adapter.
func NewVppAdapter() *VppAdapter {
a := &VppAdapter{
+ msgIDSeq: 1000,
msgIDsToName: make(map[uint16]string),
msgNameToIds: make(map[string]uint16),
- msgIDSeq: 1000,
binAPITypes: make(map[string]reflect.Type),
}
a.registerBinAPITypes()
@@ -186,8 +186,7 @@ func (a *VppAdapter) ReplyBytes(request MessageDTO, reply api.Message) ([]byte,
if err != nil {
return nil, err
}
- err = struc.Pack(buf, reply)
- if err != nil {
+ if err = struc.Pack(buf, reply); err != nil {
return nil, err
}
@@ -245,7 +244,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
Data: data,
})
if finished {
- a.callback(msgID, clientID, reply)
+ a.callback(msgID, reply)
return nil
}
}
@@ -276,7 +275,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
struc.Pack(buf, &codec.VppOtherHeader{VlMsgID: msgID})
}
struc.Pack(buf, msg.Msg)
- a.callback(msgID, context, buf.Bytes())
+ a.callback(msgID, buf.Bytes())
}
a.replies = a.replies[1:]
@@ -295,7 +294,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error {
msgID := uint16(defaultReplyMsgID)
struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: msgID, Context: clientID})
struc.Pack(buf, &defaultReply{})
- a.callback(msgID, clientID, buf.Bytes())
+ a.callback(msgID, buf.Bytes())
}
return nil
}
diff --git a/adapter/vppapiclient/vppapiclient_adapter.go b/adapter/vppapiclient/vppapiclient_adapter.go
index 7aafa55..e62bccd 100644
--- a/adapter/vppapiclient/vppapiclient_adapter.go
+++ b/adapter/vppapiclient/vppapiclient_adapter.go
@@ -28,7 +28,7 @@ package vppapiclient
#include <arpa/inet.h>
#include <vpp-api/client/vppapiclient.h>
-extern void go_msg_callback(uint16_t msg_id, uint32_t context, void* data, size_t size);
+extern void go_msg_callback(uint16_t msg_id, void* data, size_t size);
typedef struct __attribute__((__packed__)) _req_header {
uint16_t msg_id;
@@ -38,14 +38,13 @@ typedef struct __attribute__((__packed__)) _req_header {
typedef struct __attribute__((__packed__)) _reply_header {
uint16_t msg_id;
- uint32_t context; // currently not all reply messages contain context field
} reply_header_t;
static void
govpp_msg_callback (unsigned char *data, int size)
{
reply_header_t *header = ((reply_header_t *)data);
- go_msg_callback(ntohs(header->msg_id), ntohl(header->context), data, size);
+ go_msg_callback(ntohs(header->msg_id), data, size);
}
static int
@@ -204,10 +203,10 @@ func fileExists(name string) bool {
}
//export go_msg_callback
-func go_msg_callback(msgID C.uint16_t, context C.uint32_t, data unsafe.Pointer, size C.size_t) {
+func go_msg_callback(msgID C.uint16_t, data unsafe.Pointer, size C.size_t) {
// convert unsafe.Pointer to byte slice
slice := &reflect.SliceHeader{Data: uintptr(data), Len: int(size), Cap: int(size)}
byteArr := *(*[]byte)(unsafe.Pointer(slice))
- vppClient.callback(uint16(msgID), uint32(context), byteArr)
+ vppClient.callback(uint16(msgID), byteArr)
}
diff --git a/api/api.go b/api/api.go
index 39fe60f..9b7f0ff 100644
--- a/api/api.go
+++ b/api/api.go
@@ -58,21 +58,6 @@ type DataType interface {
GetCrcString() string
}
-// MessageDecoder provides functionality for decoding binary data to generated API messages.
-type MessageDecoder interface {
- // DecodeMsg decodes binary-encoded data of a message into provided Message structure.
- DecodeMsg(data []byte, msg Message) error
-}
-
-// MessageIdentifier provides identification of generated API messages.
-type MessageIdentifier interface {
- // GetMessageID returns message identifier of given API message.
- GetMessageID(msg Message) (uint16, error)
-
- // LookupByID looks up message name and crc by ID
- LookupByID(msgID uint16) (Message, error)
-}
-
// ChannelProvider provides the communication channel with govpp core.
type ChannelProvider interface {
// NewAPIChannel returns a new channel for communication with VPP via govpp core.
@@ -86,9 +71,6 @@ type ChannelProvider interface {
// Channel provides methods for direct communication with VPP channel.
type Channel interface {
- // GetID returns channel's ID
- GetID() uint16
-
// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply.
// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply).
SendRequest(msg Message) RequestCtx
@@ -101,10 +83,7 @@ type Channel interface {
// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel.
// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's
// buffer is full, the notifications will not be delivered into it.
- SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error)
-
- // UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription.
- UnsubscribeNotification(subscription *NotifSubscription) error
+ SubscribeNotification(notifChan chan Message, event Message) (SubscriptionCtx, error)
// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
// from VPP before returning an error.
@@ -114,14 +93,14 @@ type Channel interface {
Close()
}
-// RequestCtx is helper interface which allows to receive reply on request context data
+// RequestCtx is helper interface which allows to receive reply on request.
type RequestCtx interface {
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
// The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded.
ReceiveReply(msg Message) error
}
-// MultiRequestCtx is helper interface which allows to receive reply on multi-request context data
+// MultiRequestCtx is helper interface which allows to receive reply on multi-request.
type MultiRequestCtx interface {
// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs).
// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is
@@ -130,13 +109,13 @@ type MultiRequestCtx interface {
ReceiveReply(msg Message) (lastReplyReceived bool, err error)
}
-// NotifSubscription represents a subscription for delivery of specific notification messages.
-type NotifSubscription struct {
- NotifChan chan Message // channel where notification messages will be delivered to
- MsgFactory func() Message // function that returns a new instance of the specific message that is expected as a notification
- // TODO: use Message directly here, not a factory, eliminating need to allocation
+// SubscriptionCtx is helper interface which allows to control subscription for notification events.
+type SubscriptionCtx interface {
+ // Unsubscribe unsubscribes from receiving the notifications tied to the subscription context.
+ Unsubscribe() error
}
+// map of registered messages
var registeredMessages = make(map[string]Message)
// RegisterMessage is called from generated code to register message.
diff --git a/cmd/binapi-generator/generate.go b/cmd/binapi-generator/generate.go
index 33ab614..73bcd2a 100644
--- a/cmd/binapi-generator/generate.go
+++ b/cmd/binapi-generator/generate.go
@@ -140,9 +140,9 @@ func generatePackage(ctx *context, w *bufio.Writer) error {
if len(ctx.packageData.Services) > 0 {
fmt.Fprintf(w, "/* Services */\n\n")
- fmt.Fprintf(w, "type %s interface {\n", "Services")
ctx.inputBuff = bytes.NewBuffer(ctx.inputData)
ctx.inputLine = 0
+ fmt.Fprintf(w, "type %s interface {\n", "Services")
for _, svc := range ctx.packageData.Services {
generateService(ctx, w, &svc)
}
@@ -209,6 +209,7 @@ func generateImports(ctx *context, w io.Writer) {
fmt.Fprintln(w)
fmt.Fprintf(w, "// Reference imports to suppress errors if they are not otherwise used.\n")
+ fmt.Fprintf(w, "var _ = api.RegisterMessage\n")
fmt.Fprintf(w, "var _ = struc.Pack\n")
fmt.Fprintf(w, "var _ = bytes.NewBuffer\n")
fmt.Fprintln(w)
diff --git a/codec/msg_codec.go b/codec/msg_codec.go
index 572e672..9d3f614 100644
--- a/codec/msg_codec.go
+++ b/codec/msg_codec.go
@@ -58,15 +58,19 @@ func (*MsgCodec) EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) {
return nil, errors.New("nil message passed in")
}
- // encode message header
var header interface{}
+
+ // encode message header
switch msg.GetMessageType() {
case api.RequestMessage:
header = &VppRequestHeader{VlMsgID: msgID}
+
case api.ReplyMessage:
header = &VppReplyHeader{VlMsgID: msgID}
+
case api.EventMessage:
header = &VppEventHeader{VlMsgID: msgID}
+
default:
header = &VppOtherHeader{VlMsgID: msgID}
}
@@ -94,15 +98,19 @@ func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error {
return errors.New("nil message passed in")
}
- // check which header is expected
var header interface{}
+
+ // check which header is expected
switch msg.GetMessageType() {
case api.RequestMessage:
header = new(VppRequestHeader)
+
case api.ReplyMessage:
header = new(VppReplyHeader)
+
case api.EventMessage:
header = new(VppEventHeader)
+
default:
header = new(VppOtherHeader)
}
@@ -127,17 +135,19 @@ func (*MsgCodec) DecodeMsgContext(data []byte, msg api.Message) (uint32, error)
return 0, errors.New("nil message passed in")
}
+ var header interface{}
var getContext func() uint32
// check which header is expected
- var header interface{}
switch msg.GetMessageType() {
case api.RequestMessage:
header = new(VppRequestHeader)
getContext = func() uint32 { return header.(*VppRequestHeader).Context }
+
case api.ReplyMessage:
header = new(VppReplyHeader)
getContext = func() uint32 { return header.(*VppReplyHeader).Context }
+
default:
return 0, nil
}
diff --git a/core/channel.go b/core/channel.go
index 718f89c..a7d95fe 100644
--- a/core/channel.go
+++ b/core/channel.go
@@ -29,40 +29,20 @@ var (
ErrInvalidRequestCtx = errors.New("invalid request context")
)
-// requestCtx is a context for request with single reply
-type requestCtx struct {
- ch *channel
- seqNum uint16
-}
-
-// multiRequestCtx is a context for request with multiple responses
-type multiRequestCtx struct {
- ch *channel
- seqNum uint16
-}
-
-func (req *requestCtx) ReceiveReply(msg api.Message) error {
- if req == nil || req.ch == nil {
- return ErrInvalidRequestCtx
- }
-
- lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
- if err != nil {
- return err
- }
- if lastReplyReceived {
- return errors.New("multipart reply recieved while a single reply expected")
- }
-
- return nil
+// MessageCodec provides functionality for decoding binary data to generated API messages.
+type MessageCodec interface {
+ //EncodeMsg encodes message into binary data.
+ EncodeMsg(msg api.Message, msgID uint16) ([]byte, error)
+ // DecodeMsg decodes binary-encoded data of a message into provided Message structure.
+ DecodeMsg(data []byte, msg api.Message) error
}
-func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
- if req == nil || req.ch == nil {
- return false, ErrInvalidRequestCtx
- }
-
- return req.ch.receiveReplyInternal(msg, req.seqNum)
+// MessageIdentifier provides identification of generated API messages.
+type MessageIdentifier interface {
+ // GetMessageID returns message identifier of given API message.
+ GetMessageID(msg api.Message) (uint16, error)
+ // LookupByID looks up message name and crc by ID
+ LookupByID(msgID uint16) (api.Message, error)
}
// vppRequest is a request that will be sent to VPP.
@@ -81,27 +61,40 @@ type vppReply struct {
err error // in case of error, data is nil and this member contains error
}
-// NotifSubscribeRequest is a request to subscribe for delivery of specific notification messages.
-type subscriptionRequest struct {
- sub *api.NotifSubscription // subscription details
- subscribe bool // true if this is a request to subscribe
+// requestCtx is a context for request with single reply
+type requestCtx struct {
+ ch *Channel
+ seqNum uint16
+}
+
+// multiRequestCtx is a context for request with multiple responses
+type multiRequestCtx struct {
+ ch *Channel
+ seqNum uint16
+}
+
+// subscriptionCtx is a context of subscription for delivery of specific notification messages.
+type subscriptionCtx struct {
+ ch *Channel
+ notifChan chan api.Message // channel where notification messages will be delivered to
+ msgID uint16 // message ID for the subscribed event message
+ event api.Message // event message that this subscription is for
+ msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification
}
// channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
// to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
// via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
// concurrently, otherwise the responses could mix! Use multiple channels instead.
-type channel struct {
- id uint16 // channel ID
+type Channel struct {
+ id uint16
+ conn *Connection
reqChan chan *vppRequest // channel for sending the requests to VPP
replyChan chan *vppReply // channel where VPP replies are delivered to
- notifSubsChan chan *subscriptionRequest // channel for sending notification subscribe requests
- notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to
-
- msgDecoder api.MessageDecoder // used to decode binary data to generated API messages
- msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message
+ msgCodec MessageCodec // used to decode binary data to generated API messages
+ msgIdentifier MessageIdentifier // used to retrieve message ID of a message
lastSeqNum uint16 // sequence number of the last sent request
@@ -109,73 +102,142 @@ type channel struct {
replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
}
-func (ch *channel) GetID() uint16 {
+func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel {
+ return &Channel{
+ id: id,
+ conn: conn,
+ msgCodec: codec,
+ msgIdentifier: identifier,
+ reqChan: make(chan *vppRequest, reqSize),
+ replyChan: make(chan *vppReply, replySize),
+ replyTimeout: DefaultReplyTimeout,
+ }
+}
+
+func (ch *Channel) GetID() uint16 {
return ch.id
}
-func (ch *channel) nextSeqNum() uint16 {
+func (ch *Channel) nextSeqNum() uint16 {
ch.lastSeqNum++
return ch.lastSeqNum
}
-func (ch *channel) SendRequest(msg api.Message) api.RequestCtx {
- req := &vppRequest{
+func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx {
+ seqNum := ch.nextSeqNum()
+ ch.reqChan <- &vppRequest{
msg: msg,
- seqNum: ch.nextSeqNum(),
+ seqNum: seqNum,
}
- ch.reqChan <- req
- return &requestCtx{ch: ch, seqNum: req.seqNum}
+ return &requestCtx{ch: ch, seqNum: seqNum}
}
-func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
- req := &vppRequest{
+func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
+ seqNum := ch.nextSeqNum()
+ ch.reqChan <- &vppRequest{
msg: msg,
- seqNum: ch.nextSeqNum(),
+ seqNum: seqNum,
multi: true,
}
- ch.reqChan <- req
- return &multiRequestCtx{ch: ch, seqNum: req.seqNum}
+ return &multiRequestCtx{ch: ch, seqNum: seqNum}
}
-func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) {
- sub := &api.NotifSubscription{
- NotifChan: notifChan,
- MsgFactory: msgFactory,
+func getMsgFactory(msg api.Message) func() api.Message {
+ return func() api.Message {
+ return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
}
- // TODO: get rid of notifSubsChan and notfSubsReplyChan,
- // it's no longer need because we know all message IDs and can store subscription right here
- ch.notifSubsChan <- &subscriptionRequest{
- sub: sub,
- subscribe: true,
- }
- return sub, <-ch.notifSubsReplyChan
}
-func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error {
- ch.notifSubsChan <- &subscriptionRequest{
- sub: subscription,
- subscribe: false,
+func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) {
+ msgID, err := ch.msgIdentifier.GetMessageID(event)
+ if err != nil {
+ log.WithFields(logrus.Fields{
+ "msg_name": event.GetMessageName(),
+ "msg_crc": event.GetCrcString(),
+ }).Errorf("unable to retrieve message ID: %v", err)
+ return nil, fmt.Errorf("unable to retrieve event message ID: %v", err)
+ }
+
+ sub := &subscriptionCtx{
+ ch: ch,
+ notifChan: notifChan,
+ msgID: msgID,
+ event: event,
+ msgFactory: getMsgFactory(event),
}
- return <-ch.notifSubsReplyChan
+
+ // add the subscription into map
+ ch.conn.subscriptionsLock.Lock()
+ defer ch.conn.subscriptionsLock.Unlock()
+
+ ch.conn.subscriptions[msgID] = append(ch.conn.subscriptions[msgID], sub)
+
+ return sub, nil
}
-func (ch *channel) SetReplyTimeout(timeout time.Duration) {
+func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
ch.replyTimeout = timeout
}
-func (ch *channel) Close() {
+func (ch *Channel) Close() {
if ch.reqChan != nil {
close(ch.reqChan)
+ ch.reqChan = nil
+ }
+}
+
+func (req *requestCtx) ReceiveReply(msg api.Message) error {
+ if req == nil || req.ch == nil {
+ return ErrInvalidRequestCtx
+ }
+
+ lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
+ if err != nil {
+ return err
+ } else if lastReplyReceived {
+ return errors.New("multipart reply recieved while a single reply expected")
+ }
+
+ return nil
+}
+
+func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
+ if req == nil || req.ch == nil {
+ return false, ErrInvalidRequestCtx
}
+
+ return req.ch.receiveReplyInternal(msg, req.seqNum)
+}
+
+func (sub *subscriptionCtx) Unsubscribe() error {
+ log.WithFields(logrus.Fields{
+ "msg_name": sub.event.GetMessageName(),
+ "msg_id": sub.msgID,
+ }).Debug("Removing notification subscription.")
+
+ // remove the subscription from the map
+ sub.ch.conn.subscriptionsLock.Lock()
+ defer sub.ch.conn.subscriptionsLock.Unlock()
+
+ for i, item := range sub.ch.conn.subscriptions[sub.msgID] {
+ if item == sub {
+ // remove i-th item in the slice
+ sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...)
+ return nil
+ }
+ }
+
+ return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName())
}
// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
-func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
- var ignore bool
+func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
if msg == nil {
return false, errors.New("nil message passed in")
}
+ var ignore bool
+
if vppReply := ch.delayedReply; vppReply != nil {
// try the delayed reply
ch.delayedReply = nil
@@ -204,12 +266,12 @@ func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last
return
}
-func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
+func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
// check the sequence number
cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum)
if cmpSeqNums == -1 {
// reply received too late, ignore the message
- logrus.WithField("sequence-number", reply.seqNum).Warn(
+ logrus.WithField("seqNum", reply.seqNum).Warn(
"Received reply to an already closed binary API request")
ignore = true
return
@@ -253,7 +315,7 @@ func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Messa
}
// decode the message
- if err = ch.msgDecoder.DecodeMsg(reply.data, msg); err != nil {
+ if err = ch.msgCodec.DecodeMsg(reply.data, msg); err != nil {
return
}
diff --git a/core/channel_test.go b/core/channel_test.go
index 4a9ab2b..0eafa32 100644
--- a/core/channel_test.go
+++ b/core/channel_test.go
@@ -22,6 +22,7 @@ import (
"git.fd.io/govpp.git/examples/bin_api/interfaces"
"git.fd.io/govpp.git/examples/bin_api/memif"
"git.fd.io/govpp.git/examples/bin_api/tap"
+ "git.fd.io/govpp.git/examples/binapi/vpe"
"git.fd.io/govpp.git/api"
. "github.com/onsi/gomega"
@@ -59,10 +60,11 @@ func TestRequestReplyTapConnect(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
+ // mock reply
ctx.mockVpp.MockReply(&tap.TapConnectReply{
- Retval: 0,
SwIfIndex: 1,
})
+
request := &tap.TapConnect{
TapName: []byte("test-tap-name"),
UseRandomMac: 1,
@@ -71,17 +73,21 @@ func TestRequestReplyTapConnect(t *testing.T) {
err := ctx.ch.SendRequest(request).ReceiveReply(reply)
Expect(err).ShouldNot(HaveOccurred())
- Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapConnectReply")
- Expect(reply.SwIfIndex).To(BeEquivalentTo(1), "Incorrect SwIfIndex value for TapConnectReply")
+ Expect(reply.Retval).To(BeEquivalentTo(0),
+ "Incorrect Retval value for TapConnectReply")
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(1),
+ "Incorrect SwIfIndex value for TapConnectReply")
}
func TestRequestReplyTapModify(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
+ // mock reply
ctx.mockVpp.MockReply(&tap.TapModifyReply{
SwIfIndex: 2,
})
+
request := &tap.TapModify{
TapName: []byte("test-tap-modify"),
UseRandomMac: 1,
@@ -91,15 +97,19 @@ func TestRequestReplyTapModify(t *testing.T) {
err := ctx.ch.SendRequest(request).ReceiveReply(reply)
Expect(err).ShouldNot(HaveOccurred())
- Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapModifyReply")
- Expect(reply.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for TapModifyReply")
+ Expect(reply.Retval).To(BeEquivalentTo(0),
+ "Incorrect Retval value for TapModifyReply")
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(2),
+ "Incorrect SwIfIndex value for TapModifyReply")
}
func TestRequestReplyTapDelete(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
+ // mock reply
ctx.mockVpp.MockReply(&tap.TapDeleteReply{})
+
request := &tap.TapDelete{
SwIfIndex: 3,
}
@@ -107,34 +117,41 @@ func TestRequestReplyTapDelete(t *testing.T) {
err := ctx.ch.SendRequest(request).ReceiveReply(reply)
Expect(err).ShouldNot(HaveOccurred())
- Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect retval value for TapDeleteReply")
+ Expect(reply.Retval).To(BeEquivalentTo(0),
+ "Incorrect Retval value for TapDeleteReply")
}
func TestRequestReplySwInterfaceTapDump(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
+ // mock reply
byteName := []byte("dev-name-test")
ctx.mockVpp.MockReply(&tap.SwInterfaceTapDetails{
SwIfIndex: 25,
DevName: byteName,
})
+
request := &tap.SwInterfaceTapDump{}
reply := &tap.SwInterfaceTapDetails{}
err := ctx.ch.SendRequest(request).ReceiveReply(reply)
Expect(err).ShouldNot(HaveOccurred())
- Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for SwInterfaceTapDetails")
- Expect(reply.DevName).ToNot(BeNil(), "Incorrect DevName value for SwInterfaceTapDetails")
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(25),
+ "Incorrect SwIfIndex value for SwInterfaceTapDetails")
+ Expect(reply.DevName).ToNot(BeEquivalentTo(byteName),
+ "Incorrect DevName value for SwInterfaceTapDetails")
}
func TestRequestReplyMemifCreate(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
+ // mock reply
ctx.mockVpp.MockReply(&memif.MemifCreateReply{
SwIfIndex: 4,
})
+
request := &memif.MemifCreate{
Role: 10,
ID: 12,
@@ -145,15 +162,19 @@ func TestRequestReplyMemifCreate(t *testing.T) {
err := ctx.ch.SendRequest(request).ReceiveReply(reply)
Expect(err).ShouldNot(HaveOccurred())
- Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect Retval value for MemifCreate")
- Expect(reply.SwIfIndex).To(BeEquivalentTo(4), "Incorrect SwIfIndex value for MemifCreate")
+ Expect(reply.Retval).To(BeEquivalentTo(0),
+ "Incorrect Retval value for MemifCreate")
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(4),
+ "Incorrect SwIfIndex value for MemifCreate")
}
func TestRequestReplyMemifDelete(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
+ // mock reply
ctx.mockVpp.MockReply(&memif.MemifDeleteReply{})
+
request := &memif.MemifDelete{
SwIfIndex: 15,
}
@@ -161,26 +182,30 @@ func TestRequestReplyMemifDelete(t *testing.T) {
err := ctx.ch.SendRequest(request).ReceiveReply(reply)
Expect(err).ShouldNot(HaveOccurred())
- Expect(reply.Retval).To(BeEquivalentTo(0), "Incorrect Retval value for MemifDelete")
}
func TestRequestReplyMemifDetails(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
+ // mock reply
ctx.mockVpp.MockReply(&memif.MemifDetails{
SwIfIndex: 25,
IfName: []byte("memif-name"),
Role: 0,
})
+
request := &memif.MemifDump{}
reply := &memif.MemifDetails{}
err := ctx.ch.SendRequest(request).ReceiveReply(reply)
Expect(err).ShouldNot(HaveOccurred())
- Expect(reply.SwIfIndex).To(BeEquivalentTo(25), "Incorrect SwIfIndex value for MemifDetails")
- Expect(reply.IfName).ToNot(BeEmpty(), "MemifDetails IfName is empty byte array")
- Expect(reply.Role).To(BeEquivalentTo(0), "Incorrect Role value for MemifDetails")
+ Expect(reply.SwIfIndex).To(BeEquivalentTo(25),
+ "Incorrect SwIfIndex value for MemifDetails")
+ Expect(reply.IfName).ToNot(BeEmpty(),
+ "MemifDetails IfName is empty byte array")
+ Expect(reply.Role).To(BeEquivalentTo(0),
+ "Incorrect Role value for MemifDetails")
}
func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) {
@@ -204,7 +229,7 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) {
msg := &tap.SwInterfaceTapDetails{}
stop, err := reqCtx.ReceiveReply(msg)
if stop {
- break // break out of the loop
+ break
}
Expect(err).ShouldNot(HaveOccurred())
cnt++
@@ -232,7 +257,7 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) {
msg := &memif.MemifDetails{}
stop, err := reqCtx.ReceiveReply(msg)
if stop {
- break // break out of the loop
+ break
}
Expect(err).ShouldNot(HaveOccurred())
cnt++
@@ -240,51 +265,16 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) {
Expect(cnt).To(BeEquivalentTo(10))
}
-func TestNotifications(t *testing.T) {
- ctx := setupTest(t)
- defer ctx.teardownTest()
-
- // subscribe for notification
- notifChan := make(chan api.Message, 1)
- subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags)
- Expect(err).ShouldNot(HaveOccurred())
-
- // mock the notification and force its delivery
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
- SwIfIndex: 3,
- AdminUpDown: 1,
- })
- ctx.mockVpp.SendMsg(0, []byte(""))
-
- // receive the notification
- var notif *interfaces.SwInterfaceSetFlags
- Eventually(func() *interfaces.SwInterfaceSetFlags {
- select {
- case n := <-notifChan:
- notif = n.(*interfaces.SwInterfaceSetFlags)
- return notif
- default:
- return nil
- }
- }).ShouldNot(BeNil())
-
- // verify the received notifications
- Expect(notif.SwIfIndex).To(BeEquivalentTo(3), "Incorrect SwIfIndex value for SwInterfaceSetFlags")
- Expect(notif.AdminUpDown).To(BeEquivalentTo(1), "Incorrect AdminUpDown value for SwInterfaceSetFlags")
-
- ctx.ch.UnsubscribeNotification(subs)
-}
-
func TestNotificationEvent(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
// subscribe for notification
notifChan := make(chan api.Message, 1)
- subs, err := ctx.ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent)
+ sub, err := ctx.ch.SubscribeNotification(notifChan, &interfaces.SwInterfaceEvent{})
Expect(err).ShouldNot(HaveOccurred())
- // mock the notification and force its delivery
+ // mock event and force its delivery
ctx.mockVpp.MockReply(&interfaces.SwInterfaceEvent{
SwIfIndex: 2,
LinkUpDown: 1,
@@ -307,16 +297,9 @@ func TestNotificationEvent(t *testing.T) {
Expect(notif.SwIfIndex).To(BeEquivalentTo(2), "Incorrect SwIfIndex value for SwInterfaceSetFlags")
Expect(notif.LinkUpDown).To(BeEquivalentTo(1), "Incorrect LinkUpDown value for SwInterfaceSetFlags")
- ctx.ch.UnsubscribeNotification(subs)
-}
-
-/*func TestCheckMessageCompatibility(t *testing.T) {
- ctx := setupTest(t)
- defer ctx.teardownTest()
-
- err := ctx.ch.CheckMessageCompatibility(&interfaces.SwInterfaceSetFlags{})
+ err = sub.Unsubscribe()
Expect(err).ShouldNot(HaveOccurred())
-}*/
+}
func TestSetReplyTimeout(t *testing.T) {
ctx := setupTest(t)
@@ -324,8 +307,10 @@ func TestSetReplyTimeout(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond)
- // first one request should work
+ // mock reply
ctx.mockVpp.MockReply(&ControlPingReply{})
+
+ // first one request should work
err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -339,16 +324,23 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
- ctx.ch.SetReplyTimeout(time.Millisecond)
+ ctx.ch.SetReplyTimeout(time.Millisecond * 100)
- var msgs []api.Message
- for i := 1; i <= 3; i++ {
- msgs = append(msgs, &interfaces.SwInterfaceDetails{
- SwIfIndex: uint32(i),
+ // mock reply
+ ctx.mockVpp.MockReply(
+ &interfaces.SwInterfaceDetails{
+ SwIfIndex: 1,
InterfaceName: []byte("if-name-test"),
- })
- }
- ctx.mockVpp.MockReply(msgs...)
+ },
+ &interfaces.SwInterfaceDetails{
+ SwIfIndex: 2,
+ InterfaceName: []byte("if-name-test"),
+ },
+ &interfaces.SwInterfaceDetails{
+ SwIfIndex: 3,
+ InterfaceName: []byte("if-name-test"),
+ },
+ )
ctx.mockVpp.MockReply(&ControlPingReply{})
cnt := 0
@@ -357,12 +349,12 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) {
for {
msg := &interfaces.SwInterfaceDetails{}
stop, err := reqCtx.ReceiveReply(msg)
- if stop {
- break // break out of the loop
- }
if err != nil {
return err
}
+ if stop {
+ break
+ }
cnt++
}
return nil
@@ -443,7 +435,7 @@ func TestMultiRequestDouble(t *testing.T) {
msg := &interfaces.SwInterfaceDetails{}
stop, err := reqCtx.ReceiveReply(msg)
if stop {
- break // break out of the loop
+ break
}
if err != nil {
return err
@@ -468,8 +460,10 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond)
- // first one request should work
+ // mock reply
ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 1})
+ // first one request should work
+
err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -479,9 +473,16 @@ func TestReceiveReplyAfterTimeout(t *testing.T) {
ctx.mockVpp.MockReplyWithContext(
// simulating late reply
- mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 2},
+ mock.MsgWithContext{
+ Msg: &ControlPingReply{},
+ SeqNum: 2,
+ },
// normal reply for next request
- mock.MsgWithContext{Msg: &tap.TapConnectReply{}, SeqNum: 3})
+ mock.MsgWithContext{
+ Msg: &tap.TapConnectReply{},
+ SeqNum: 3,
+ },
+ )
req := &tap.TapConnect{
TapName: []byte("test-tap-name"),
@@ -508,8 +509,10 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
ctx.ch.SetReplyTimeout(time.Millisecond * 100)
- // first one request should work
+ // mock reply
ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{Msg: &ControlPingReply{}, SeqNum: 1})
+
+ // first one request should work
err := ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{})
Expect(err).ShouldNot(HaveOccurred())
@@ -520,7 +523,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
msg := &interfaces.SwInterfaceDetails{}
stop, err := reqCtx.ReceiveReply(msg)
if stop {
- break // break out of the loop
+ break
}
if err != nil {
return err
@@ -564,18 +567,20 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) {
Expect(err).ShouldNot(HaveOccurred())
}
-/*func TestInvalidMessageID(t *testing.T) {
+func TestInvalidMessageID(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()
- // first one request should work
+ // mock reply
+ ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
+
+ // first one request should work
err := ctx.ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(&vpe.ShowVersionReply{})
Expect(err).ShouldNot(HaveOccurred())
// second should fail with error invalid message ID
- ctx.mockVpp.MockReply(&vpe.ShowVersionReply{})
err = ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{})
Expect(err).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("invalid message ID"))
-}*/
+}
diff --git a/core/connection.go b/core/connection.go
index c77358f..7d014ce 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -29,42 +29,19 @@ import (
"git.fd.io/govpp.git/codec"
)
-const (
- requestChannelBufSize = 100 // default size of the request channel buffer
- replyChannelBufSize = 100 // default size of the reply channel buffer
- notificationChannelBufSize = 100 // default size of the notification channel buffer
-
- defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
+var (
+ RequestChanBufSize = 100 // default size of the request channel buffer
+ ReplyChanBufSize = 100 // default size of the reply channel buffer
+ NotificationChanBufSize = 100 // default size of the notification channel buffer
)
var (
- healthCheckInterval = time.Second * 1 // default health check interval
- healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check
- healthCheckThreshold = 1 // number of failed health checks until the error is reported
+ HealthCheckProbeInterval = time.Second * 1 // default health check probe interval
+ HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
+ HealthCheckThreshold = 1 // number of failed health checks until the error is reported
+ DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP
)
-// SetHealthCheckProbeInterval sets health check probe interval.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckProbeInterval(interval time.Duration) {
- healthCheckInterval = interval
-}
-
-// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
-// If reply arrives after the timeout, check is considered as failed.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckReplyTimeout(timeout time.Duration) {
- healthCheckReplyTimeout = timeout
-}
-
-// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckThreshold(threshold int) {
- healthCheckThreshold = threshold
-}
-
// ConnectionState represents the current state of the connection to VPP.
type ConnectionState int
@@ -104,10 +81,10 @@ type Connection struct {
maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
channelsLock sync.RWMutex // lock for the channels map
- channels map[uint16]*channel // map of all API channels indexed by the channel ID
+ channels map[uint16]*Channel // map of all API channels indexed by the channel ID
- notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
- notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
+ subscriptionsLock sync.RWMutex // lock for the subscriptions map
+ subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
pingReqID uint16 // ID if the ControlPing message
pingReplyID uint16 // ID of the ControlPingReply message
@@ -116,18 +93,30 @@ type Connection struct {
lastReply time.Time // time of the last received reply from VPP
}
+func newConnection(vpp adapter.VppAdapter) *Connection {
+ c := &Connection{
+ vpp: vpp,
+ codec: &codec.MsgCodec{},
+ msgIDs: make(map[string]uint16),
+ msgMap: make(map[uint16]api.Message),
+ channels: make(map[uint16]*Channel),
+ subscriptions: make(map[uint16][]*subscriptionCtx),
+ }
+ vpp.SetMsgCallback(c.msgCallback)
+ return c
+}
+
// Connect connects to VPP using specified VPP adapter and returns the connection handle.
// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
// create new connection handle
- c, err := newConnection(vppAdapter)
+ c, err := createConnection(vppAdapter)
if err != nil {
return nil, err
}
// blocking attempt to connect to VPP
- err = c.connectVPP()
- if err != nil {
+ if err := c.connectVPP(); err != nil {
return nil, err
}
@@ -140,13 +129,13 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
// create new connection handle
- c, err := newConnection(vppAdapter)
+ c, err := createConnection(vppAdapter)
if err != nil {
return nil, nil, err
}
// asynchronously attempt to connect to VPP
- connChan := make(chan ConnectionEvent, notificationChannelBufSize)
+ connChan := make(chan ConnectionEvent, NotificationChanBufSize)
go c.connectLoop(connChan)
return c, connChan, nil
@@ -168,7 +157,7 @@ func (c *Connection) Disconnect() {
}
// newConnection returns new connection handle.
-func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
+func createConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
connLock.Lock()
defer connLock.Unlock()
@@ -176,15 +165,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
return nil, errors.New("only one connection per process is supported")
}
- conn = &Connection{
- vpp: vppAdapter,
- codec: &codec.MsgCodec{},
- channels: make(map[uint16]*channel),
- msgIDs: make(map[string]uint16),
- msgMap: make(map[uint16]api.Message),
- notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
- }
- conn.vpp.SetMsgCallback(conn.msgCallback)
+ conn = newConnection(vppAdapter)
return conn, nil
}
@@ -211,8 +192,72 @@ func (c *Connection) connectVPP() error {
return nil
}
-func getMsgNameWithCrc(x api.Message) string {
- return x.GetMessageName() + "_" + x.GetCrcString()
+func (c *Connection) NewAPIChannel() (api.Channel, error) {
+ return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
+}
+
+func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
+ return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
+}
+
+// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
+// It allows to specify custom buffer sizes for the request and reply Go channels.
+func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
+
+ // create new channel
+ chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+ channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
+
+ // store API channel within the client
+ c.channelsLock.Lock()
+ c.channels[chID] = channel
+ c.channelsLock.Unlock()
+
+ // start watching on the request channel
+ go c.watchRequests(channel)
+
+ return channel, nil
+}
+
+// releaseAPIChannel releases API channel that needs to be closed.
+func (c *Connection) releaseAPIChannel(ch *Channel) {
+ log.WithFields(logger.Fields{
+ "channel": ch.id,
+ }).Debug("API channel released")
+
+ // delete the channel from channels map
+ c.channelsLock.Lock()
+ delete(c.channels, ch.id)
+ c.channelsLock.Unlock()
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+ if c == nil {
+ return 0, errors.New("nil connection passed in")
+ }
+
+ if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
+ return msgID, nil
+ }
+
+ return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
+}
+
+// LookupByID looks up message name and crc by ID.
+func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
+
+ if msg, ok := c.msgMap[msgID]; ok {
+ return msg, nil
+ }
+
+ return nil, fmt.Errorf("unknown message ID: %d", msgID)
}
// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
@@ -268,32 +313,6 @@ func (c *Connection) retrieveMessageIDs() (err error) {
return nil
}
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
- if c == nil {
- return 0, errors.New("nil connection passed in")
- }
-
- if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
- return msgID, nil
- }
-
- return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
-}
-
-// LookupByID looks up message name and crc by ID.
-func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
- if c == nil {
- return nil, errors.New("nil connection passed in")
- }
-
- if msg, ok := c.msgMap[msgID]; ok {
- return msg, nil
- }
-
- return nil, fmt.Errorf("unknown message ID: %d", msgID)
-}
-
// disconnectVPP disconnects from VPP in case it is connected.
func (c *Connection) disconnectVPP() {
if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
@@ -341,7 +360,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
// send health check probes until an error or timeout occurs
for {
// sleep until next health check probe period
- time.Sleep(healthCheckInterval)
+ time.Sleep(HealthCheckProbeInterval)
if atomic.LoadUint32(&c.connected) == 0 {
// Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -365,7 +384,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
case vppReply := <-ch.replyChan:
err = vppReply.err
- case <-time.After(healthCheckReplyTimeout):
+ case <-time.After(HealthCheckReplyTimeout):
err = ErrProbeTimeout
// check if time since last reply from any other
@@ -374,7 +393,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
sinceLastReply = time.Since(c.lastReply)
c.lastReplyLock.Unlock()
- if sinceLastReply < healthCheckReplyTimeout {
+ if sinceLastReply < HealthCheckReplyTimeout {
log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
continue
}
@@ -384,10 +403,10 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
if err == ErrProbeTimeout {
failedChecks++
- log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
- if failedChecks > healthCheckThreshold {
+ log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
+ if failedChecks > HealthCheckThreshold {
// in case of exceeded failed check treshold, assume VPP disconnected
- log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+ log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
break
}
@@ -411,52 +430,6 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
c.connectLoop(connChan)
}
-func (c *Connection) NewAPIChannel() (api.Channel, error) {
- return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize)
-}
-
-func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
- return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
-}
-
-// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
-// It allows to specify custom buffer sizes for the request and reply Go channels.
-func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) {
- if c == nil {
- return nil, errors.New("nil connection passed in")
- }
-
- chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
- ch := &channel{
- id: chID,
- replyTimeout: defaultReplyTimeout,
- msgDecoder: c.codec,
- msgIdentifier: c,
- reqChan: make(chan *vppRequest, reqChanBufSize),
- replyChan: make(chan *vppReply, replyChanBufSize),
- notifSubsChan: make(chan *subscriptionRequest, reqChanBufSize),
- notifSubsReplyChan: make(chan error, replyChanBufSize),
- }
-
- // store API channel within the client
- c.channelsLock.Lock()
- c.channels[chID] = ch
- c.channelsLock.Unlock()
-
- // start watching on the request channel
- go c.watchRequests(ch)
-
- return ch, nil
-}
-
-// releaseAPIChannel releases API channel that needs to be closed.
-func (c *Connection) releaseAPIChannel(ch *channel) {
- log.WithFields(logger.Fields{
- "channel": ch.id,
- }).Debug("API channel released")
-
- // delete the channel from channels map
- c.channelsLock.Lock()
- delete(c.channels, ch.id)
- c.channelsLock.Unlock()
+func getMsgNameWithCrc(x api.Message) string {
+ return x.GetMessageName() + "_" + x.GetCrcString()
}
diff --git a/core/connection_test.go b/core/connection_test.go
index 5c8c309..cea7d2d 100644
--- a/core/connection_test.go
+++ b/core/connection_test.go
@@ -14,18 +14,16 @@
package core_test
-/*
import (
"testing"
"git.fd.io/govpp.git/adapter/mock"
"git.fd.io/govpp.git/api"
+ "git.fd.io/govpp.git/codec"
"git.fd.io/govpp.git/core"
- "git.fd.io/govpp.git/core/bin_api/vpe"
"git.fd.io/govpp.git/examples/bin_api/interfaces"
- "git.fd.io/govpp.git/examples/bin_api/stats"
-
- "git.fd.io/govpp.git/codec"
+ "git.fd.io/govpp.git/examples/binapi/stats"
+ "git.fd.io/govpp.git/examples/binapi/vpe"
. "github.com/onsi/gomega"
)
@@ -38,8 +36,9 @@ type testCtx struct {
func setupTest(t *testing.T, bufferedChan bool) *testCtx {
RegisterTestingT(t)
- ctx := &testCtx{}
- ctx.mockVpp = &mock.VppAdapter{}
+ ctx := &testCtx{
+ mockVpp: mock.NewVppAdapter(),
+ }
var err error
ctx.conn, err = core.Connect(ctx.mockVpp)
@@ -60,100 +59,6 @@ func (ctx *testCtx) teardownTest() {
ctx.conn.Disconnect()
}
-func TestSimpleRequest(t *testing.T) {
- ctx := setupTest(t, false)
- defer ctx.teardownTest()
-
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: -5})
-
- req := &vpe.ControlPing{}
- reply := &vpe.ControlPingReply{}
-
- // send the request and receive a reply
- ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: req}
- vppReply := <-ctx.ch.GetReplyChannel()
-
- Expect(vppReply).ShouldNot(BeNil())
- Expect(vppReply.Error).ShouldNot(HaveOccurred())
-
- // decode the message
- err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply)
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(reply.Retval).To(BeEquivalentTo(-5))
-}
-
-func TestMultiRequest(t *testing.T) {
- ctx := setupTest(t, false)
- defer ctx.teardownTest()
-
- msgs := []api.Message{}
- for m := 0; m < 10; m++ {
- msgs = append(msgs, &interfaces.SwInterfaceDetails{})
- }
- ctx.mockVpp.MockReply(msgs...)
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
-
- // send multipart request
- ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true}
-
- cnt := 0
- for {
- // receive a reply
- vppReply := <-ctx.ch.GetReplyChannel()
- if vppReply.LastReplyReceived {
- break // break out of the loop
- }
- Expect(vppReply.Error).ShouldNot(HaveOccurred())
-
- // decode the message
- reply := &interfaces.SwInterfaceDetails{}
- err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply)
- Expect(err).ShouldNot(HaveOccurred())
- cnt++
- }
-
- Expect(cnt).To(BeEquivalentTo(10))
-}
-
-func TestNotifications(t *testing.T) {
- ctx := setupTest(t, false)
- defer ctx.teardownTest()
-
- // subscribe for notification
- notifChan := make(chan api.Message, 1)
- subscription := &api.NotifSubscription{
- NotifChan: notifChan,
- MsgFactory: interfaces.NewSwInterfaceSetFlags,
- }
- ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{
- Subscription: subscription,
- Subscribe: true,
- }
- err := <-ctx.ch.GetNotificationReplyChannel()
- Expect(err).ShouldNot(HaveOccurred())
-
- // mock the notification and force its delivery
- ctx.mockVpp.MockReply(&interfaces.SwInterfaceSetFlags{
- SwIfIndex: 3,
- AdminUpDown: 1,
- })
- ctx.mockVpp.SendMsg(0, []byte{0})
-
- // receive the notification
- notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags)
-
- Expect(notif.SwIfIndex).To(BeEquivalentTo(3))
-
- // unsubscribe notification
- ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{
- Subscription: subscription,
- Subscribe: false,
- }
- err = <-ctx.ch.GetNotificationReplyChannel()
- Expect(err).ShouldNot(HaveOccurred())
-}
-
func TestNilConnection(t *testing.T) {
RegisterTestingT(t)
var conn *core.Connection
@@ -184,47 +89,16 @@ func TestAsyncConnection(t *testing.T) {
defer ctx.teardownTest()
ctx.conn.Disconnect()
- conn, ch, err := core.AsyncConnect(ctx.mockVpp)
+ conn, statusChan, err := core.AsyncConnect(ctx.mockVpp)
ctx.conn = conn
Expect(err).ShouldNot(HaveOccurred())
Expect(conn).ShouldNot(BeNil())
- ev := <-ch
+ ev := <-statusChan
Expect(ev.State).Should(BeEquivalentTo(core.Connected))
}
-func TestFullBuffer(t *testing.T) {
- ctx := setupTest(t, false)
- defer ctx.teardownTest()
-
- // close the default API channel
- ctx.ch.Close()
-
- // create a new channel with limited buffer sizes
- var err error
- ctx.ch, err = ctx.conn.NewAPIChannelBuffered(10, 1)
- Expect(err).ShouldNot(HaveOccurred())
-
- // send multiple requests, only one reply should be read
- for i := 0; i < 20; i++ {
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
- ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &vpe.ControlPing{}}
- }
-
- vppReply := <-ctx.ch.GetReplyChannel()
- Expect(vppReply).ShouldNot(BeNil())
-
- var received bool
- select {
- case <-ctx.ch.GetReplyChannel():
- received = true // this should not happen
- default:
- received = false // no reply to be received
- }
- Expect(received).Should(BeFalse(), "A reply has been recieved, should had been ignored.")
-}
-
func TestCodec(t *testing.T) {
RegisterTestingT(t)
@@ -289,7 +163,7 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) {
var reqCtx []api.RequestCtx
for i := 0; i < 10; i++ {
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
req := &vpe.ControlPing{}
reqCtx = append(reqCtx, ctx.ch.SendRequest(req))
}
@@ -298,7 +172,6 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) {
reply := &vpe.ControlPingReply{}
err := reqCtx[i].ReceiveReply(reply)
Expect(err).ShouldNot(HaveOccurred())
- Expect(reply.Retval).To(BeEquivalentTo(i))
}
}
@@ -306,7 +179,7 @@ func TestMultiRequestsWithSequenceNumbers(t *testing.T) {
ctx := setupTest(t, false)
defer ctx.teardownTest()
- msgs := []api.Message{}
+ var msgs []api.Message
for i := 0; i < 10; i++ {
msgs = append(msgs, &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)})
}
@@ -325,7 +198,7 @@ func TestMultiRequestsWithSequenceNumbers(t *testing.T) {
lastReplyReceived, err := reqCtx.ReceiveReply(reply)
if lastReplyReceived {
- break // break out of the loop
+ break
}
Expect(err).ShouldNot(HaveOccurred())
@@ -343,7 +216,7 @@ func TestSimpleRequestWithTimeout(t *testing.T) {
// reply for a previous timeouted requests to be ignored
ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{
- Msg: &vpe.ControlPingReply{Retval: 1},
+ Msg: &vpe.ControlPingReply{},
SeqNum: 0,
})
@@ -359,12 +232,12 @@ func TestSimpleRequestWithTimeout(t *testing.T) {
ctx.mockVpp.MockReplyWithContext(
// reply for the previous request
mock.MsgWithContext{
- Msg: &vpe.ControlPingReply{Retval: 1},
+ Msg: &vpe.ControlPingReply{},
SeqNum: 1,
},
// reply for the next request
mock.MsgWithContext{
- Msg: &vpe.ControlPingReply{Retval: 2},
+ Msg: &vpe.ControlPingReply{},
SeqNum: 2,
})
@@ -376,7 +249,6 @@ func TestSimpleRequestWithTimeout(t *testing.T) {
reply = &vpe.ControlPingReply{}
err = reqCtx2.ReceiveReply(reply)
Expect(err).To(BeNil())
- Expect(reply.Retval).To(BeEquivalentTo(2))
}
func TestSimpleRequestsWithMissingReply(t *testing.T) {
@@ -393,7 +265,7 @@ func TestSimpleRequestsWithMissingReply(t *testing.T) {
// third request with reply
ctx.mockVpp.MockReplyWithContext(mock.MsgWithContext{
- Msg: &vpe.ControlPingReply{Retval: 3},
+ Msg: &vpe.ControlPingReply{},
SeqNum: 3,
})
req3 := &vpe.ControlPing{}
@@ -414,7 +286,6 @@ func TestSimpleRequestsWithMissingReply(t *testing.T) {
reply = &vpe.ControlPingReply{}
err = reqCtx3.ReceiveReply(reply)
Expect(err).To(BeNil())
- Expect(reply.Retval).To(BeEquivalentTo(3))
}
func TestMultiRequestsWithErrors(t *testing.T) {
@@ -422,38 +293,25 @@ func TestMultiRequestsWithErrors(t *testing.T) {
defer ctx.teardownTest()
// replies for a previous timeouted requests to be ignored
- msgs := []mock.MsgWithContext{}
- msgs = append(msgs,
- mock.MsgWithContext{
- Msg: &vpe.ControlPingReply{Retval: 1},
- SeqNum: 0xffff - 1,
- },
- mock.MsgWithContext{
- Msg: &vpe.ControlPingReply{Retval: 1},
- SeqNum: 0xffff,
- },
- mock.MsgWithContext{
- Msg: &vpe.ControlPingReply{Retval: 1},
- SeqNum: 0,
- })
-
+ msgs := []mock.MsgWithContext{
+ {Msg: &vpe.ControlPingReply{}, SeqNum: 0xffff - 1},
+ {Msg: &vpe.ControlPingReply{}, SeqNum: 0xffff},
+ {Msg: &vpe.ControlPingReply{}, SeqNum: 0},
+ }
for i := 0; i < 10; i++ {
- msgs = append(msgs,
- mock.MsgWithContext{
- Msg: &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)},
- SeqNum: 1,
- Multipart: true,
- })
+ msgs = append(msgs, mock.MsgWithContext{
+ Msg: &interfaces.SwInterfaceDetails{SwIfIndex: uint32(i)},
+ SeqNum: 1,
+ Multipart: true,
+ })
}
// missing finalizing control ping
// reply for a next request
- msgs = append(msgs,
- mock.MsgWithContext{
- Msg: &vpe.ControlPingReply{Retval: 2},
- SeqNum: 2,
- Multipart: false,
- })
+ msgs = append(msgs, mock.MsgWithContext{
+ Msg: &vpe.ControlPingReply{},
+ SeqNum: 2,
+ })
// queue replies
ctx.mockVpp.MockReplyWithContext(msgs...)
@@ -487,7 +345,6 @@ func TestMultiRequestsWithErrors(t *testing.T) {
reply2 := &vpe.ControlPingReply{}
err = reqCtx2.ReceiveReply(reply2)
Expect(err).To(BeNil())
- Expect(reply2.Retval).To(BeEquivalentTo(2))
}
func TestRequestsOrdering(t *testing.T) {
@@ -498,12 +355,12 @@ func TestRequestsOrdering(t *testing.T) {
// some replies will get thrown away
// first request
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 1})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
req1 := &vpe.ControlPing{}
reqCtx1 := ctx.ch.SendRequest(req1)
// second request
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: 2})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
req2 := &vpe.ControlPing{}
reqCtx2 := ctx.ch.SendRequest(req2)
@@ -512,7 +369,6 @@ func TestRequestsOrdering(t *testing.T) {
reply2 := &vpe.ControlPingReply{}
err := reqCtx2.ReceiveReply(reply2)
Expect(err).To(BeNil())
- Expect(reply2.Retval).To(BeEquivalentTo(2))
// first request has already been considered closed
reply1 := &vpe.ControlPingReply{}
@@ -522,7 +378,7 @@ func TestRequestsOrdering(t *testing.T) {
}
func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
- ctx := setupTest(t, true)
+ ctx := setupTest(t, false)
defer ctx.teardownTest()
numIters := 0xffff + 100
@@ -530,7 +386,7 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
for i := 0; i < numIters+30; i++ {
if i < numIters {
- ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)})
+ ctx.mockVpp.MockReply(&vpe.ControlPingReply{})
req := &vpe.ControlPing{}
reqCtx[i] = ctx.ch.SendRequest(req)
}
@@ -538,8 +394,6 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) {
reply := &vpe.ControlPingReply{}
err := reqCtx[i-30].ReceiveReply(reply)
Expect(err).ShouldNot(HaveOccurred())
- Expect(reply.Retval).To(BeEquivalentTo(i - 30))
}
}
}
-*/
diff --git a/core/notification_handler.go b/core/notification_handler.go
deleted file mode 100644
index 7b889e3..0000000
--- a/core/notification_handler.go
+++ /dev/null
@@ -1,170 +0,0 @@
-// Copyright (c) 2017 Cisco and/or its affiliates.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at:
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package core
-
-import (
- "fmt"
-
- "git.fd.io/govpp.git/api"
- logger "github.com/sirupsen/logrus"
-)
-
-// processSubscriptionRequest processes a notification subscribe request.
-func (c *Connection) processSubscriptionRequest(ch *channel, req *subscriptionRequest) error {
- var err error
-
- // subscribe / unsubscribe
- if req.subscribe {
- err = c.addNotifSubscription(req.sub)
- } else {
- err = c.removeNotifSubscription(req.sub)
- }
-
- // send the reply into the go channel
- select {
- case ch.notifSubsReplyChan <- err:
- // reply sent successfully
- default:
- // unable to write into the channel without blocking
- log.WithFields(logger.Fields{
- "channel": ch.id,
- }).Warn("Unable to deliver the subscribe reply, reciever end not ready.")
- }
-
- return err
-}
-
-// addNotifSubscription adds the notification subscription into the subscriptions map of the connection.
-func (c *Connection) addNotifSubscription(subs *api.NotifSubscription) error {
- // get message ID of the notification message
- msgID, msgName, err := c.getSubscriptionMessageID(subs)
- if err != nil {
- return err
- }
-
- log.WithFields(logger.Fields{
- "msg_name": msgName,
- "msg_id": msgID,
- }).Debug("Adding new notification subscription.")
-
- // add the subscription into map
- c.notifSubscriptionsLock.Lock()
- defer c.notifSubscriptionsLock.Unlock()
-
- c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID], subs)
-
- return nil
-}
-
-// removeNotifSubscription removes the notification subscription from the subscriptions map of the connection.
-func (c *Connection) removeNotifSubscription(subs *api.NotifSubscription) error {
- // get message ID of the notification message
- msgID, msgName, err := c.getSubscriptionMessageID(subs)
- if err != nil {
- return err
- }
-
- log.WithFields(logger.Fields{
- "msg_name": msgName,
- "msg_id": msgID,
- }).Debug("Removing notification subscription.")
-
- // remove the subscription from the map
- c.notifSubscriptionsLock.Lock()
- defer c.notifSubscriptionsLock.Unlock()
-
- for i, item := range c.notifSubscriptions[msgID] {
- if item == subs {
- // remove i-th item in the slice
- c.notifSubscriptions[msgID] = append(c.notifSubscriptions[msgID][:i], c.notifSubscriptions[msgID][i+1:]...)
- break
- }
- }
-
- return nil
-}
-
-// isNotificationMessage returns true if someone has subscribed to provided message ID.
-func (c *Connection) isNotificationMessage(msgID uint16) bool {
- c.notifSubscriptionsLock.RLock()
- defer c.notifSubscriptionsLock.RUnlock()
-
- _, exists := c.notifSubscriptions[msgID]
- return exists
-}
-
-// sendNotifications send a notification message to all subscribers subscribed for that message.
-func (c *Connection) sendNotifications(msgID uint16, data []byte) {
- c.notifSubscriptionsLock.RLock()
- defer c.notifSubscriptionsLock.RUnlock()
-
- matched := false
-
- // send to notification to each subscriber
- for _, subs := range c.notifSubscriptions[msgID] {
- msg := subs.MsgFactory()
- log.WithFields(logger.Fields{
- "msg_name": msg.GetMessageName(),
- "msg_id": msgID,
- "msg_size": len(data),
- }).Debug("Sending a notification to the subscription channel.")
-
- if err := c.codec.DecodeMsg(data, msg); err != nil {
- log.WithFields(logger.Fields{
- "msg_name": msg.GetMessageName(),
- "msg_id": msgID,
- "msg_size": len(data),
- }).Errorf("Unable to decode the notification message: %v", err)
- continue
- }
-
- // send the message into the go channel of the subscription
- select {
- case subs.NotifChan <- msg:
- // message sent successfully
- default:
- // unable to write into the channel without blocking
- log.WithFields(logger.Fields{
- "msg_name": msg.GetMessageName(),
- "msg_id": msgID,
- "msg_size": len(data),
- }).Warn("Unable to deliver the notification, reciever end not ready.")
- }
-
- matched = true
- }
-
- if !matched {
- log.WithFields(logger.Fields{
- "msg_id": msgID,
- "msg_size": len(data),
- }).Info("No subscription found for the notification message.")
- }
-}
-
-// getSubscriptionMessageID returns ID of the message the subscription is tied to.
-func (c *Connection) getSubscriptionMessageID(subs *api.NotifSubscription) (uint16, string, error) {
- msg := subs.MsgFactory()
- msgID, err := c.GetMessageID(msg)
- if err != nil {
- log.WithFields(logger.Fields{
- "msg_name": msg.GetMessageName(),
- "msg_crc": msg.GetCrcString(),
- }).Errorf("unable to retrieve message ID: %v", err)
- return 0, "", fmt.Errorf("unable to retrieve message ID: %v", err)
- }
-
- return msgID, msg.GetMessageName(), nil
-}
diff --git a/core/request_handler.go b/core/request_handler.go
index fd6d100..14c095d 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -29,7 +29,7 @@ var (
)
// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *channel) {
+func (c *Connection) watchRequests(ch *Channel) {
for {
select {
case req, ok := <-ch.reqChan:
@@ -40,16 +40,12 @@ func (c *Connection) watchRequests(ch *channel) {
return
}
c.processRequest(ch, req)
-
- case req := <-ch.notifSubsChan:
- // new request on the notification subscribe channel
- c.processSubscriptionRequest(ch, req)
}
}
}
// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
+func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
// check whether we are connected to VPP
if atomic.LoadUint32(&c.connected) == 0 {
err := ErrNotConnected
@@ -137,7 +133,7 @@ func (c *Connection) processRequest(ch *channel, req *vppRequest) error {
}
// msgCallback is called whenever any binary API message comes from VPP.
-func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
+func (c *Connection) msgCallback(msgID uint16, data []byte) {
connLock.RLock()
defer connLock.RUnlock()
@@ -157,13 +153,8 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
// - replies that don't have context as first field (comes as zero)
// - events that don't have context at all (comes as non zero)
//
- msgContext, err := c.codec.DecodeMsgContext(data, msg)
- if err == nil {
- if context != msgContext {
- log.Debugf("different context was decoded from message (%d -> %d)", context, msgContext)
- context = msgContext
- }
- } else {
+ context, err := c.codec.DecodeMsgContext(data, msg)
+ if err != nil {
log.Errorf("decoding context failed: %v", err)
}
@@ -218,7 +209,7 @@ func (c *Connection) msgCallback(msgID uint16, context uint32, data []byte) {
// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
// it logs the error and do not send the message.
-func sendReply(ch *channel, reply *vppReply) {
+func sendReply(ch *Channel, reply *vppReply) {
select {
case ch.replyChan <- reply:
// reply sent successfully
@@ -232,10 +223,68 @@ func sendReply(ch *channel, reply *vppReply) {
}
}
-func sendReplyError(ch *channel, req *vppRequest, err error) {
+func sendReplyError(ch *Channel, req *vppRequest, err error) {
sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
}
+// isNotificationMessage returns true if someone has subscribed to provided message ID.
+func (c *Connection) isNotificationMessage(msgID uint16) bool {
+ c.subscriptionsLock.RLock()
+ defer c.subscriptionsLock.RUnlock()
+
+ _, exists := c.subscriptions[msgID]
+ return exists
+}
+
+// sendNotifications send a notification message to all subscribers subscribed for that message.
+func (c *Connection) sendNotifications(msgID uint16, data []byte) {
+ c.subscriptionsLock.RLock()
+ defer c.subscriptionsLock.RUnlock()
+
+ matched := false
+
+ // send to notification to each subscriber
+ for _, sub := range c.subscriptions[msgID] {
+ log.WithFields(logger.Fields{
+ "msg_name": sub.event.GetMessageName(),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Debug("Sending a notification to the subscription channel.")
+
+ event := sub.msgFactory()
+ if err := c.codec.DecodeMsg(data, event); err != nil {
+ log.WithFields(logger.Fields{
+ "msg_name": sub.event.GetMessageName(),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Errorf("Unable to decode the notification message: %v", err)
+ continue
+ }
+
+ // send the message into the go channel of the subscription
+ select {
+ case sub.notifChan <- event:
+ // message sent successfully
+ default:
+ // unable to write into the channel without blocking
+ log.WithFields(logger.Fields{
+ "msg_name": sub.event.GetMessageName(),
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Warn("Unable to deliver the notification, reciever end not ready.")
+ }
+
+ matched = true
+ }
+
+ if !matched {
+ log.WithFields(logger.Fields{
+ "msg_id": msgID,
+ "msg_size": len(data),
+ }).Info("No subscription found for the notification message.")
+ }
+}
+
// +------------------+-------------------+-----------------------+
// | 15b = channel ID | 1b = is multipart | 16b = sequence number |
// +------------------+-------------------+-----------------------+
diff --git a/examples/bin_api/acl/acl.ba.go b/examples/bin_api/acl/acl.ba.go
index ff80173..0dfd335 100644
--- a/examples/bin_api/acl/acl.ba.go
+++ b/examples/bin_api/acl/acl.ba.go
@@ -19,6 +19,7 @@ import "github.com/lunixbochs/struc"
import "bytes"
// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
var _ = struc.Pack
var _ = bytes.NewBuffer
diff --git a/examples/bin_api/af_packet/af_packet.ba.go b/examples/bin_api/af_packet/af_packet.ba.go
index a6bdc93..144bf74 100644
--- a/examples/bin_api/af_packet/af_packet.ba.go
+++ b/examples/bin_api/af_packet/af_packet.ba.go
@@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc"
import "bytes"
// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
var _ = struc.Pack
var _ = bytes.NewBuffer
diff --git a/examples/bin_api/interfaces/interfaces.ba.go b/examples/bin_api/interfaces/interfaces.ba.go
index 5ef58ed..38bbb6b 100644
--- a/examples/bin_api/interfaces/interfaces.ba.go
+++ b/examples/bin_api/interfaces/interfaces.ba.go
@@ -19,6 +19,7 @@ import "github.com/lunixbochs/struc"
import "bytes"
// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
var _ = struc.Pack
var _ = bytes.NewBuffer
diff --git a/examples/bin_api/ip/ip.ba.go b/examples/bin_api/ip/ip.ba.go
index c980b6a..9dc3d56 100644
--- a/examples/bin_api/ip/ip.ba.go
+++ b/examples/bin_api/ip/ip.ba.go
@@ -21,6 +21,7 @@ import "github.com/lunixbochs/struc"
import "bytes"
// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
var _ = struc.Pack
var _ = bytes.NewBuffer
diff --git a/examples/bin_api/memif/memif.ba.go b/examples/bin_api/memif/memif.ba.go
index 3650355..ceb615f 100644
--- a/examples/bin_api/memif/memif.ba.go
+++ b/examples/bin_api/memif/memif.ba.go
@@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc"
import "bytes"
// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
var _ = struc.Pack
var _ = bytes.NewBuffer
diff --git a/examples/bin_api/stats/stats.ba.go b/examples/bin_api/stats/stats.ba.go
index eb2dd8f..b66a077 100644
--- a/examples/bin_api/stats/stats.ba.go
+++ b/examples/bin_api/stats/stats.ba.go
@@ -19,6 +19,7 @@ import "github.com/lunixbochs/struc"
import "bytes"
// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
var _ = struc.Pack
var _ = bytes.NewBuffer
diff --git a/examples/bin_api/tap/tap.ba.go b/examples/bin_api/tap/tap.ba.go
index 36f5549..d2878ea 100644
--- a/examples/bin_api/tap/tap.ba.go
+++ b/examples/bin_api/tap/tap.ba.go
@@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc"
import "bytes"
// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
var _ = struc.Pack
var _ = bytes.NewBuffer
diff --git a/examples/bin_api/vpe/vpe.ba.go b/examples/bin_api/vpe/vpe.ba.go
index f91a164..50476c3 100644
--- a/examples/bin_api/vpe/vpe.ba.go
+++ b/examples/bin_api/vpe/vpe.ba.go
@@ -18,6 +18,7 @@ import "github.com/lunixbochs/struc"
import "bytes"
// Reference imports to suppress errors if they are not otherwise used.
+var _ = api.RegisterMessage
var _ = struc.Pack
var _ = bytes.NewBuffer
diff --git a/examples/cmd/perf-bench/perf-bench.go b/examples/cmd/perf-bench/perf-bench.go
index 5b4b17d..664f046 100644
--- a/examples/cmd/perf-bench/perf-bench.go
+++ b/examples/cmd/perf-bench/perf-bench.go
@@ -20,7 +20,6 @@ import (
"flag"
"fmt"
"log"
- "os"
"time"
"github.com/pkg/profile"
@@ -65,16 +64,14 @@ func main() {
// connect to VPP
conn, err := govpp.Connect("")
if err != nil {
- log.Println("Error:", err)
- os.Exit(1)
+ log.Fatalln("Error:", err)
}
defer conn.Disconnect()
// create an API channel
ch, err := conn.NewAPIChannelBuffered(cnt, cnt)
if err != nil {
- log.Println("Error:", err)
- os.Exit(1)
+ log.Fatalln("Error:", err)
}
defer ch.Close()
@@ -101,10 +98,8 @@ func syncTest(ch api.Channel, cnt int) {
req := &vpe.ControlPing{}
reply := &vpe.ControlPingReply{}
- err := ch.SendRequest(req).ReceiveReply(reply)
- if err != nil {
- log.Println("Error in reply:", err)
- os.Exit(1)
+ if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
+ log.Fatalln("Error in reply:", err)
}
}
}
@@ -125,8 +120,7 @@ func asyncTest(ch api.Channel, cnt int) {
for ctx := range ctxChan {
reply := &vpe.ControlPingReply{}
if err := ctx.ReceiveReply(reply); err != nil {
- log.Println("Error in reply:", err)
- os.Exit(1)
+ log.Fatalln("Error in reply:", err)
}
}
}
diff --git a/examples/cmd/simple-client/simple_client.go b/examples/cmd/simple-client/simple_client.go
index b9e8052..08d4da6 100644
--- a/examples/cmd/simple-client/simple_client.go
+++ b/examples/cmd/simple-client/simple_client.go
@@ -18,6 +18,7 @@ package main
import (
"fmt"
+ "log"
"net"
"os"
"strings"
@@ -35,16 +36,14 @@ func main() {
// connect to VPP
conn, err := govpp.Connect("")
if err != nil {
- fmt.Println("Error:", err)
- os.Exit(1)
+ log.Fatalln("ERROR:", err)
}
defer conn.Disconnect()
// create an API channel that will be used in the examples
ch, err := conn.NewAPIChannel()
if err != nil {
- fmt.Println("Error:", err)
- os.Exit(1)
+ log.Fatalln("ERROR:", err)
}
defer ch.Close()
@@ -64,20 +63,22 @@ func main() {
// aclVersion is the simplest API example - one empty request message and one reply message.
func aclVersion(ch api.Channel) {
+ fmt.Println("ACL getting version")
+
req := &acl.ACLPluginGetVersion{}
reply := &acl.ACLPluginGetVersionReply{}
- err := ch.SendRequest(req).ReceiveReply(reply)
-
- if err != nil {
- fmt.Println("Error:", err)
+ if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
+ fmt.Println("ERROR:", err)
} else {
- fmt.Printf("%+v\n", reply)
+ fmt.Printf("ACL version reply: %+v\n", reply)
}
}
// aclConfig is another simple API example - in this case, the request contains structured data.
func aclConfig(ch api.Channel) {
+ fmt.Println("ACL adding replace")
+
req := &acl.ACLAddReplace{
ACLIndex: ^uint32(0),
Tag: []byte("access list 1"),
@@ -102,10 +103,8 @@ func aclConfig(ch api.Channel) {
}
reply := &acl.ACLAddReplaceReply{}
- err := ch.SendRequest(req).ReceiveReply(reply)
-
- if err != nil {
- fmt.Println("Error:", err)
+ if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
+ fmt.Println("ERROR:", err)
return
}
if reply.Retval != 0 {
@@ -113,19 +112,23 @@ func aclConfig(ch api.Channel) {
return
}
- fmt.Printf("%+v\n", reply)
+ fmt.Printf("ACL add replace reply: %+v\n", reply)
}
// aclDump shows an example where SendRequest and ReceiveReply are not chained together.
func aclDump(ch api.Channel) {
+ fmt.Println("Dumping ACL")
+
req := &acl.ACLDump{}
reply := &acl.ACLDetails{}
- if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
- fmt.Println("Error:", err)
+ reqCtx := ch.SendRequest(req)
+
+ if err := reqCtx.ReceiveReply(reply); err != nil {
+ fmt.Println("ERROR:", err)
} else {
- fmt.Printf("%+v\n", reply)
+ fmt.Printf("ACL details: %+v\n", reply)
}
}
@@ -133,14 +136,13 @@ func aclDump(ch api.Channel) {
func interfaceDump(ch api.Channel) {
fmt.Println("Dumping interfaces")
- req := &interfaces.SwInterfaceDump{}
- reqCtx := ch.SendMultiRequest(req)
+ reqCtx := ch.SendMultiRequest(&interfaces.SwInterfaceDump{})
for {
msg := &interfaces.SwInterfaceDetails{}
stop, err := reqCtx.ReceiveReply(msg)
if stop {
- break // break out of the loop
+ break
}
if err != nil {
fmt.Println("ERROR:", err)
@@ -148,7 +150,7 @@ func interfaceDump(ch api.Channel) {
ifaceName := strings.TrimFunc(string(msg.InterfaceName), func(r rune) bool {
return r == 0x00
})
- fmt.Printf("Interface: %q %+v\n", ifaceName, msg)
+ fmt.Printf("Interface %q: %+v\n", ifaceName, msg)
}
}
@@ -164,12 +166,12 @@ func ipAddressDump(ch api.Channel) {
msg := &ip.IPAddressDetails{}
stop, err := reqCtx.ReceiveReply(msg)
if stop {
- break // break out of the loop
+ break
}
if err != nil {
fmt.Println("ERROR:", err)
}
- fmt.Printf("ip address: %d %+v\n", msg.SwIfIndex, msg)
+ fmt.Printf("ip address details: %d %+v\n", msg.SwIfIndex, msg)
}
}
@@ -183,7 +185,7 @@ func setIpUnnumbered(ch api.Channel) {
reply := &interfaces.SwInterfaceSetUnnumberedReply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
- fmt.Println("Error:", err)
+ fmt.Println("ERROR:", err)
} else {
fmt.Printf("%+v\n", reply)
}
@@ -192,21 +194,20 @@ func setIpUnnumbered(ch api.Channel) {
func ipUnnumberedDump(ch api.Channel) {
fmt.Println("Dumping IP unnumbered")
- req := &ip.IPUnnumberedDump{
+ reqCtx := ch.SendMultiRequest(&ip.IPUnnumberedDump{
SwIfIndex: ^uint32(0),
- }
- reqCtx := ch.SendMultiRequest(req)
+ })
for {
msg := &ip.IPUnnumberedDetails{}
stop, err := reqCtx.ReceiveReply(msg)
if stop {
- break // break out of the loop
+ break
}
if err != nil {
fmt.Println("ERROR:", err)
}
- fmt.Printf("ip unnumbered: %+v\n", msg)
+ fmt.Printf("IP unnumbered details: %+v\n", msg)
}
}
@@ -214,9 +215,12 @@ func ipUnnumberedDump(ch api.Channel) {
// you are supposed to create your own Go channel with your preferred buffer size. If the channel's
// buffer is full, the notifications will not be delivered into it.
func interfaceNotifications(ch api.Channel) {
- // subscribe for specific notification message
+ fmt.Println("Subscribing to notificaiton events")
+
notifChan := make(chan api.Message, 100)
- subs, err := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceEvent)
+
+ // subscribe for specific notification message
+ sub, err := ch.SubscribeNotification(notifChan, &interfaces.SwInterfaceEvent{})
if err != nil {
panic(err)
}
@@ -248,7 +252,7 @@ func interfaceNotifications(ch api.Channel) {
// receive one notification
notif := (<-notifChan).(*interfaces.SwInterfaceEvent)
- fmt.Printf("NOTIF: %+v\n", notif)
+ fmt.Printf("incoming event: %+v\n", notif)
// disable interface events in VPP
err = ch.SendRequest(&interfaces.WantInterfaceEvents{
@@ -260,7 +264,7 @@ func interfaceNotifications(ch api.Channel) {
}
// unsubscribe from delivery of the notifications
- err = ch.UnsubscribeNotification(subs)
+ err = sub.Unsubscribe()
if err != nil {
panic(err)
}
diff --git a/examples/cmd/stats-client/stats_client.go b/examples/cmd/stats-client/stats_client.go
index 4ea4659..f61f975 100644
--- a/examples/cmd/stats-client/stats_client.go
+++ b/examples/cmd/stats-client/stats_client.go
@@ -18,6 +18,7 @@ package main
import (
"fmt"
+ "log"
"os"
"os/signal"
@@ -28,45 +29,41 @@ import (
)
func main() {
- fmt.Println("Starting stats VPP client...")
+ fmt.Println("Starting stats VPP client..")
// async connect to VPP
conn, statCh, err := govpp.AsyncConnect("")
if err != nil {
- fmt.Println("Error:", err)
- os.Exit(1)
+ log.Fatalln("Error:", err)
}
defer conn.Disconnect()
// create an API channel that will be used in the examples
ch, err := conn.NewAPIChannel()
if err != nil {
- fmt.Println("Error:", err)
- os.Exit(1)
+ log.Fatalln("Error:", err)
}
- defer fmt.Println("calling close")
defer ch.Close()
// create channel for Interrupt signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
- var simpleCountersSubs *api.NotifSubscription
- var combinedCountersSubs *api.NotifSubscription
var notifChan chan api.Message
+ var simpleSub api.SubscriptionCtx
+ var combinedSub api.SubscriptionCtx
// loop until Interrupt signal is received
loop:
for {
select {
-
case connEvent := <-statCh:
// VPP connection state change
switch connEvent.State {
case core.Connected:
fmt.Println("VPP connected.")
- if simpleCountersSubs == nil {
- simpleCountersSubs, combinedCountersSubs, notifChan = subscribeNotifications(ch)
+ if notifChan == nil {
+ simpleSub, combinedSub, notifChan = subscribeNotifications(ch)
}
requestStatistics(ch)
@@ -93,24 +90,24 @@ loop:
}
}
- ch.UnsubscribeNotification(simpleCountersSubs)
- ch.UnsubscribeNotification(combinedCountersSubs)
+ simpleSub.Unsubscribe()
+ combinedSub.Unsubscribe()
}
// subscribeNotifications subscribes for interface counters notifications.
-func subscribeNotifications(ch api.Channel) (*api.NotifSubscription, *api.NotifSubscription, chan api.Message) {
+func subscribeNotifications(ch api.Channel) (api.SubscriptionCtx, api.SubscriptionCtx, chan api.Message) {
notifChan := make(chan api.Message, 100)
- simpleCountersSubs, err := ch.SubscribeNotification(notifChan, stats.NewVnetInterfaceSimpleCounters)
+ simpleSub, err := ch.SubscribeNotification(notifChan, &stats.VnetInterfaceSimpleCounters{})
if err != nil {
panic(err)
}
- combinedCountersSubs, err := ch.SubscribeNotification(notifChan, stats.NewVnetInterfaceCombinedCounters)
+ combinedSub, err := ch.SubscribeNotification(notifChan, &stats.VnetInterfaceCombinedCounters{})
if err != nil {
panic(err)
}
- return simpleCountersSubs, combinedCountersSubs, notifChan
+ return simpleSub, combinedSub, notifChan
}
// requestStatistics requests interface counters notifications from VPP.