From 280b1c6c83b676ef4e592f4ecf60cb5b54b6a753 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Tue, 16 Jun 2020 10:40:34 +0200 Subject: Optimize socketclient adapter and add various code improvements This commit includes: Features - optimized [socketclient](adapter/socketclient) adapter and add method to set client name - added list of compatible messages to `CompatibilityError` Fixes - `MsgCodec` will recover panic occurring during a message decoding - calling `Unsubscibe` will close the notification channel Other - improved log messages to provide more relevant info Examples - added more code samples of working with unions in [union example](examples/union-example) - added profiling mode to [perf bench](examples/perf-bench) example - improved [simple client](examples/simple-client) example to work properly even with multiple runs Dependencies - updated `github.com/sirupsen/logrus` dep to `v1.6.0` - updated `github.com/lunixbochs/struc` dep to `v0.0.0-20200521075829-a4cb8d33dbbe` Change-Id: I136a3968ccf9e93760d7ee2b9902fc7e6390a09d Signed-off-by: Ondrej Fabry --- core/channel.go | 14 +++++++++----- core/channel_test.go | 2 +- core/connection.go | 18 ++++++++++++++---- core/log.go | 21 ++++++++++++--------- core/request_handler.go | 20 ++++++++++++++++---- 5 files changed, 52 insertions(+), 23 deletions(-) (limited to 'core') diff --git a/core/channel.go b/core/channel.go index 363a267..8479d6a 100644 --- a/core/channel.go +++ b/core/channel.go @@ -37,6 +37,8 @@ type MessageCodec interface { EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) // DecodeMsg decodes binary-encoded data of a message into provided Message structure. DecodeMsg(data []byte, msg api.Message) error + // DecodeMsgContext decodes context from message data. + DecodeMsgContext(data []byte, msg api.Message) (context uint32, err error) } // MessageIdentifier provides identification of generated API messages. @@ -84,7 +86,7 @@ type subscriptionCtx struct { msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification } -// channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests +// Channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines // concurrently, otherwise the responses could mix! Use multiple channels instead. @@ -150,13 +152,13 @@ func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error { _, err := ch.msgIdentifier.GetMessageID(msg) if err != nil { if uerr, ok := err.(*adapter.UnknownMsgError); ok { - m := fmt.Sprintf("%s_%s", uerr.MsgName, uerr.MsgCrc) - comperr.IncompatibleMessages = append(comperr.IncompatibleMessages, m) + comperr.IncompatibleMessages = append(comperr.IncompatibleMessages, getMsgID(uerr.MsgName, uerr.MsgCrc)) continue } // other errors return immediatelly return err } + comperr.CompatibleMessages = append(comperr.CompatibleMessages, getMsgNameWithCrc(msg)) } if len(comperr.IncompatibleMessages) == 0 { return nil @@ -234,6 +236,8 @@ func (sub *subscriptionCtx) Unsubscribe() error { for i, item := range sub.ch.conn.subscriptions[sub.msgID] { if item == sub { + // close notification channel + close(sub.ch.conn.subscriptions[sub.msgID][i].notifChan) // remove i-th item in the slice sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...) return nil @@ -328,9 +332,9 @@ func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Messa msgNameCrc = getMsgNameWithCrc(replyMsg) } - err = fmt.Errorf("received invalid message ID (seqNum=%d), expected %d (%s), but got %d (%s) "+ + err = fmt.Errorf("received unexpected message (seqNum=%d), expected %s (ID %d), but got %s (ID %d) "+ "(check if multiple goroutines are not sharing single GoVPP channel)", - reply.seqNum, expMsgID, msg.GetMessageName(), reply.msgID, msgNameCrc) + reply.seqNum, msg.GetMessageName(), expMsgID, msgNameCrc, reply.msgID) return } diff --git a/core/channel_test.go b/core/channel_test.go index b8d07b5..6775519 100644 --- a/core/channel_test.go +++ b/core/channel_test.go @@ -466,5 +466,5 @@ func TestInvalidMessageID(t *testing.T) { // second should fail with error invalid message ID err = ctx.ch.SendRequest(&ControlPing{}).ReceiveReply(&ControlPingReply{}) Expect(err).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("invalid message ID")) + Expect(err.Error()).To(ContainSubstring("unexpected message")) } diff --git a/core/connection.go b/core/connection.go index 264ec43..917f1cb 100644 --- a/core/connection.go +++ b/core/connection.go @@ -95,7 +95,7 @@ type Connection struct { vppConnected uint32 // non-zero if the adapter is connected to VPP - codec *codec.MsgCodec // message codec + codec MessageCodec // message codec msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgMap map[uint16]api.Message // map of messages indexed by message ID @@ -374,7 +374,11 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { } func getMsgNameWithCrc(x api.Message) string { - return x.GetMessageName() + "_" + x.GetCrcString() + return getMsgID(x.GetMessageName(), x.GetCrcString()) +} + +func getMsgID(name, crc string) string { + return name + "_" + crc } func getMsgFactory(msg api.Message) func() api.Message { @@ -425,9 +429,14 @@ func (c *Connection) retrieveMessageIDs() (err error) { var n int for name, msg := range msgs { + typ := reflect.TypeOf(msg).Elem() + path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name()) + msgID, err := c.GetMessageID(msg) if err != nil { - log.Debugf("retrieving msgID for %s failed: %v", name, err) + if debugMsgIDs { + log.Debugf("retrieving message ID for %s failed: %v", path, err) + } continue } n++ @@ -444,7 +453,8 @@ func (c *Connection) retrieveMessageIDs() (err error) { log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID) } } - log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t)) + log.WithField("took", time.Since(t)). + Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs)) return nil } diff --git a/core/log.go b/core/log.go index 5960d6b..dea6cbb 100644 --- a/core/log.go +++ b/core/log.go @@ -2,32 +2,35 @@ package core import ( "os" + "strings" - logger "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) var ( debug = os.Getenv("DEBUG_GOVPP") != "" - debugMsgIDs = os.Getenv("DEBUG_GOVPP_MSGIDS") != "" + debugMsgIDs = strings.Contains(os.Getenv("DEBUG_GOVPP"), "msgid") - log = logger.New() // global logger + log = logrus.New() ) -// init initializes global logger, which logs debug level messages to stdout. +// init initializes global logger func init() { - log.Out = os.Stdout + log.Formatter = &logrus.TextFormatter{ + EnvironmentOverrideColors: true, + } if debug { - log.Level = logger.DebugLevel - log.Debugf("govpp/core: debug mode enabled") + log.Level = logrus.DebugLevel + log.Debugf("govpp: debug level enabled") } } // SetLogger sets global logger to l. -func SetLogger(l *logger.Logger) { +func SetLogger(l *logrus.Logger) { log = l } // SetLogLevel sets global logger level to lvl. -func SetLogLevel(lvl logger.Level) { +func SetLogLevel(lvl logrus.Level) { log.Level = lvl } diff --git a/core/request_handler.go b/core/request_handler.go index ddd5307..e272c6f 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -17,10 +17,13 @@ package core import ( "errors" "fmt" + "reflect" "sync/atomic" "time" logger "github.com/sirupsen/logrus" + + "git.fd.io/govpp.git/api" ) var ReplyChannelTimeout = time.Millisecond * 100 @@ -93,7 +96,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { "msg_size": len(data), "seq_num": req.seqNum, "msg_crc": req.msg.GetCrcString(), - }).Debugf("==> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg) + }).Debugf("--> govpp SEND: %s %+v", req.msg.GetMessageName(), req.msg) } // send the request to VPP @@ -118,7 +121,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { "msg_id": c.pingReqID, "msg_size": len(pingData), "seq_num": req.seqNum, - }).Debug("--> sending control ping") + }).Debug(" -> sending control ping") if err := c.vppClient.SendMsg(context, pingData); err != nil { log.WithFields(logger.Fields{ @@ -156,7 +159,16 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { } chanID, isMulti, seqNum := unpackRequestContext(context) + if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled + msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + + // decode the message + if err = c.codec.DecodeMsg(data, msg); err != nil { + err = fmt.Errorf("decoding message failed: %w", err) + return + } + log.WithFields(logger.Fields{ "context": context, "msg_id": msgID, @@ -165,7 +177,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { "is_multi": isMulti, "seq_num": seqNum, "msg_crc": msg.GetCrcString(), - }).Debugf("<== govpp recv: %s", msg.GetMessageName()) + }).Debugf("<-- govpp RECEIVE: %s %+v", msg.GetMessageName(), msg) } if context == 0 || c.isNotificationMessage(msgID) { @@ -210,7 +222,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { func sendReply(ch *Channel, reply *vppReply) { select { case ch.replyChan <- reply: - // reply sent successfully + // reply sent successfully case <-time.After(ReplyChannelTimeout): // receiver still not ready log.WithFields(logger.Fields{ -- cgit 1.2.3-korg