aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorOndrej Fabry <ofabry@cisco.com>2020-06-16 10:40:34 +0200
committerOndrej Fabry <ofabry@cisco.com>2020-06-16 10:40:34 +0200
commit280b1c6c83b676ef4e592f4ecf60cb5b54b6a753 (patch)
treebf9a35f020de061ba66a432411ee44866405fe76 /core
parentf049390060630c0085fe4ad683c83a4a14a47ffb (diff)
Optimize socketclient adapter and add various code improvements
This commit includes: Features - optimized [socketclient](adapter/socketclient) adapter and add method to set client name - added list of compatible messages to `CompatibilityError` Fixes - `MsgCodec` will recover panic occurring during a message decoding - calling `Unsubscibe` will close the notification channel Other - improved log messages to provide more relevant info Examples - added more code samples of working with unions in [union example](examples/union-example) - added profiling mode to [perf bench](examples/perf-bench) example - improved [simple client](examples/simple-client) example to work properly even with multiple runs Dependencies - updated `github.com/sirupsen/logrus` dep to `v1.6.0` - updated `github.com/lunixbochs/struc` dep to `v0.0.0-20200521075829-a4cb8d33dbbe` Change-Id: I136a3968ccf9e93760d7ee2b9902fc7e6390a09d Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
Diffstat (limited to 'core')
-rw-r--r--core/channel.go14
-rw-r--r--core/channel_test.go2
-rw-r--r--core/connection.go18
-rw-r--r--core/log.go21
-rw-r--r--core/request_handler.go20
5 files changed, 52 insertions, 23 deletions
diff --git a/core/channel.go b/core/channel.go
index 363a267..8479d6a 100644
--- a/core/channel.go
+++ b/core/channel.go
@@ -37,6 +37,8 @@ type MessageCodec interface {
EncodeMsg(msg api.Message, msgID uint16) ([]byte, error)
// DecodeMsg decodes binary-encoded data of a message into provided Message structure.
DecodeMsg(data []byte, msg api.Message) error
+ // DecodeMsgContext decodes context from message data.
+ DecodeMsgContext(data []byte, msg api.Message) (context uint32, err error)
}
// MessageIdentifier provides identification of generated API messages.
@@ -84,7 +86,7 @@ type subscriptionCtx struct {
msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification
}
-// channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
+// Channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
// to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
// via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
// concurrently, otherwise the responses could mix! Use multiple channels instead.
@@ -150,13 +152,13 @@ func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
_, err := ch.msgIdentifier.GetMessageID(msg)
if err != nil {
if uerr, ok := err.(*adapter.UnknownMsgError); ok {
- m := fmt.Sprintf("%s_%s", uerr.MsgName, uerr.MsgCrc)
- comperr.IncompatibleMessages = append(comperr.IncompatibleMessages, m)
+ comperr.IncompatibleMessages = append(comperr.IncompatibleMessages, getMsgID(uerr.MsgName, uerr.MsgCrc))
continue
}
// other errors return immediatelly
return err
}
+ comperr.CompatibleMessages = append(comperr.CompatibleMessages, getMsgNameWithCrc(msg))
}
if len(comperr.IncompatibleMessages) == 0 {
return nil
@@ -234,6 +236,8 @@ func (sub *subscriptionCtx) Unsubscribe() error {
for i, item := range sub.ch.conn.subscriptions[sub.msgID] {
if item == sub {
+ // close notification channel
+ close(sub.ch.conn.subscriptions[sub.msgID][i].notifChan)
// remove i-th item in the slice
sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...)
return nil
@@ -328,9 +332,9 @@ func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Messa
msgNameCrc = getMsgNameWithCrc(replyMsg)
}
- err = fmt.Errorf("received invalid message ID (seqNum=%d), expected %d (%s), but got %d (%s) "+
+ err = fmt.Errorf("received unexpected message (seqNum=%d), expected %s (ID %d), but got %s (ID %d) "+
"(check if multiple goroutines are not sharing single GoVPP channel)",
- reply.seqNum, expMsgID, msg.GetMessageName(), reply.msgID, msgNameCrc)
+ reply.seqNum, msg.GetMessageName(), expMsgID, msgNameCrc, reply.msgID)
return
}
diff --git a/core/channel_test.go b/core/channel_test.go
index b8d07b5..6775519 100644
--- a/core/channel_test.go
+++ b/core/channel_test.go
@@ -466,5 +466,5 @@ func TestInvalidMessageID(t *testing.T) {
// second should fail with error invalid message ID
err = ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{})
Expect(err).Should(HaveOccurred())
- Expect(err.Error()).To(ContainSubstring("invalid message ID"))
+ Expect(err.Error()).To(ContainSubstring("unexpected message"))
}
diff --git a/core/connection.go b/core/connection.go
index 264ec43..917f1cb 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -95,7 +95,7 @@ type Connection struct {
vppConnected uint32 // non-zero if the adapter is connected to VPP
- codec *codec.MsgCodec // message codec
+ codec MessageCodec // message codec
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
msgMap map[uint16]api.Message // map of messages indexed by message ID
@@ -374,7 +374,11 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
}
func getMsgNameWithCrc(x api.Message) string {
- return x.GetMessageName() + "_" + x.GetCrcString()
+ return getMsgID(x.GetMessageName(), x.GetCrcString())
+}
+
+func getMsgID(name, crc string) string {
+ return name + "_" + crc
}
func getMsgFactory(msg api.Message) func() api.Message {
@@ -425,9 +429,14 @@ func (c *Connection) retrieveMessageIDs() (err error) {
var n int
for name, msg := range msgs {
+ typ := reflect.TypeOf(msg).Elem()
+ path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name())
+
msgID, err := c.GetMessageID(msg)
if err != nil {
- log.Debugf("retrieving msgID for %s failed: %v", name, err)
+ if debugMsgIDs {
+ log.Debugf("retrieving message ID for %s failed: %v", path, err)
+ }
continue
}
n++
@@ -444,7 +453,8 @@ func (c *Connection) retrieveMessageIDs() (err error) {
log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
}
}
- log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t))
+ log.WithField("took", time.Since(t)).
+ Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs))
return nil
}
diff --git a/core/log.go b/core/log.go
index 5960d6b..dea6cbb 100644
--- a/core/log.go
+++ b/core/log.go
@@ -2,32 +2,35 @@ package core
import (
"os"
+ "strings"
- logger "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
)
var (
debug = os.Getenv("DEBUG_GOVPP") != ""
- debugMsgIDs = os.Getenv("DEBUG_GOVPP_MSGIDS") != ""
+ debugMsgIDs = strings.Contains(os.Getenv("DEBUG_GOVPP"), "msgid")
- log = logger.New() // global logger
+ log = logrus.New()
)
-// init initializes global logger, which logs debug level messages to stdout.
+// init initializes global logger
func init() {
- log.Out = os.Stdout
+ log.Formatter = &logrus.TextFormatter{
+ EnvironmentOverrideColors: true,
+ }
if debug {
- log.Level = logger.DebugLevel
- log.Debugf("govpp/core: debug mode enabled")
+ log.Level = logrus.DebugLevel
+ log.Debugf("govpp: debug level enabled")
}
}
// SetLogger sets global logger to l.
-func SetLogger(l *logger.Logger) {
+func SetLogger(l *logrus.Logger) {
log = l
}
// SetLogLevel sets global logger level to lvl.
-func SetLogLevel(lvl logger.Level) {
+func SetLogLevel(lvl logrus.Level) {
log.Level = lvl
}
diff --git a/core/request_handler.go b/core/request_handler.go
index ddd5307..e272c6f 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -17,10 +17,13 @@ package core
import (
"errors"
"fmt"
+ "reflect"
"sync/atomic"
"time"
logger "github.com/sirupsen/logrus"
+
+ "git.fd.io/govpp.git/api"
)
var ReplyChannelTimeout = time.Millisecond * 100
@@ -93,7 +96,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"msg_size": len(data),
"seq_num": req.seqNum,
"msg_crc": req.msg.GetCrcString(),
- }).Debugf("==> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
+ }).Debugf("--> govpp SEND: %s %+v", req.msg.GetMessageName(), req.msg)
}
// send the request to VPP
@@ -118,7 +121,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"msg_id": c.pingReqID,
"msg_size": len(pingData),
"seq_num": req.seqNum,
- }).Debug("--> sending control ping")
+ }).Debug(" -> sending control ping")
if err := c.vppClient.SendMsg(context, pingData); err != nil {
log.WithFields(logger.Fields{
@@ -156,7 +159,16 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
}
chanID, isMulti, seqNum := unpackRequestContext(context)
+
if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
+ msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+
+ // decode the message
+ if err = c.codec.DecodeMsg(data, msg); err != nil {
+ err = fmt.Errorf("decoding message failed: %w", err)
+ return
+ }
+
log.WithFields(logger.Fields{
"context": context,
"msg_id": msgID,
@@ -165,7 +177,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
"is_multi": isMulti,
"seq_num": seqNum,
"msg_crc": msg.GetCrcString(),
- }).Debugf("<== govpp recv: %s", msg.GetMessageName())
+ }).Debugf("<-- govpp RECEIVE: %s %+v", msg.GetMessageName(), msg)
}
if context == 0 || c.isNotificationMessage(msgID) {
@@ -210,7 +222,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
func sendReply(ch *Channel, reply *vppReply) {
select {
case ch.replyChan <- reply:
- // reply sent successfully
+ // reply sent successfully
case <-time.After(ReplyChannelTimeout):
// receiver still not ready
log.WithFields(logger.Fields{