diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/connection.go | 27 | ||||
-rw-r--r-- | core/request_handler.go | 79 | ||||
-rw-r--r-- | core/trace.go | 70 | ||||
-rw-r--r-- | core/trace_test.go | 265 |
4 files changed, 386 insertions, 55 deletions
diff --git a/core/connection.go b/core/connection.go index f3ff964..ee5a06b 100644 --- a/core/connection.go +++ b/core/connection.go @@ -123,6 +123,8 @@ type Connection struct { msgControlPing api.Message msgControlPingReply api.Message + + apiTrace *trace // API tracer (disabled by default) } func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection { @@ -145,6 +147,10 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) subscriptions: make(map[uint16][]*subscriptionCtx), msgControlPing: msgControlPing, msgControlPingReply: msgControlPingReply, + apiTrace: &trace{ + list: make([]*api.Record, 0), + mux: &sync.Mutex{}, + }, } binapi.SetMsgCallback(c.msgCallback) return c @@ -480,3 +486,24 @@ func (c *Connection) sendConnEvent(event ConnectionEvent) { log.Warn("Connection state channel is full, discarding value.") } } + +// Trace gives access to the API trace interface +func (c *Connection) Trace() api.Trace { + return c.apiTrace +} + +// trace records api message +func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) { + if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 { + return + } + entry := &api.Record{ + Message: msg, + Timestamp: t, + IsReceived: isReceived, + ChannelID: chId, + } + c.apiTrace.mux.Lock() + c.apiTrace.list = append(c.apiTrace.list, entry) + c.apiTrace.mux.Unlock() +} diff --git a/core/request_handler.go b/core/request_handler.go index 95bd924..bf014de 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -17,6 +17,7 @@ package core import ( "errors" "fmt" + "reflect" "sync/atomic" "time" @@ -54,51 +55,6 @@ func (c *Connection) watchRequests(ch *Channel) { } // processRequest processes a single request received on the request channel. -func (c *Connection) sendMessage(context uint32, msg api.Message) error { - // check whether we are connected to VPP - if atomic.LoadUint32(&c.vppConnected) == 0 { - return ErrNotConnected - } - - /*log := log.WithFields(logger.Fields{ - "context": context, - "msg_name": msg.GetMessageName(), - "msg_crc": msg.GetCrcString(), - })*/ - - // retrieve message ID - msgID, err := c.GetMessageID(msg) - if err != nil { - //log.WithError(err).Debugf("unable to retrieve message ID: %#v", msg) - return err - } - - //log = log.WithField("msg_id", msgID) - - // encode the message - data, err := c.codec.EncodeMsg(msg, msgID) - if err != nil { - log.WithError(err).Debugf("unable to encode message: %#v", msg) - return err - } - - //log = log.WithField("msg_length", len(data)) - - if log.Level >= logger.DebugLevel { - log.Debugf("--> SEND: MSG %T %+v", msg, msg) - } - - // send message to VPP - err = c.vppClient.SendMsg(context, data) - if err != nil { - log.WithError(err).Debugf("unable to send message: %#v", msg) - return err - } - - return nil -} - -// processRequest processes a single request received on the request channel. func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.vppConnected) == 0 { @@ -156,6 +112,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { } // send the request to VPP + t := time.Now() err = c.vppClient.SendMsg(context, data) if err != nil { log.WithFields(logger.Fields{ @@ -171,6 +128,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { }).Warnf("Unable to send message") return err } + c.trace(req.msg, ch.id, t, false) if req.multi { // send a control ping to determine end of the multipart response @@ -188,6 +146,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { }).Debugf(" -> SEND MSG: %T", c.msgControlPing) } + t = time.Now() if err := c.vppClient.SendMsg(context, pingData); err != nil { log.WithFields(logger.Fields{ "context": context, @@ -195,6 +154,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { "error": err, }).Warnf("unable to send control ping") } + c.trace(c.msgControlPing, ch.id, t, false) } return nil @@ -209,7 +169,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { return } - msgType, name, crc, err := c.getMessageDataByID(msgID) + msg, err := c.getMessageByID(msgID) if err != nil { log.Warnln(err) return @@ -220,7 +180,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { // - replies that don't have context as first field (comes as zero) // - events that don't have context at all (comes as non zero) // - context, err := c.codec.DecodeMsgContext(data, msgType) + context, err := c.codec.DecodeMsgContext(data, msg.GetMessageType()) if err != nil { log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err) return @@ -228,6 +188,14 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { chanID, isMulti, seqNum := unpackRequestContext(context) + // decode and trace the message + msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + if err = c.codec.DecodeMsg(data, msg); err != nil { + log.WithField("msg", msg).Warnf("Unable to decode message: %v", err) + return + } + c.trace(msg, chanID, time.Now(), true) + if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ "context": context, @@ -236,8 +204,8 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { "channel": chanID, "is_multi": isMulti, "seq_num": seqNum, - "msg_crc": crc, - }).Debugf("<-- govpp RECEIVE: %s", name) + "msg_crc": msg.GetCrcString(), + }).Debugf("<-- govpp RECEIVE: %s", msg.GetMessageName()) } if context == 0 || c.isNotificationMessage(msgID) { @@ -411,12 +379,13 @@ func compareSeqNumbers(seqNum1, seqNum2 uint16) int { return 1 } -// Returns message data based on the message ID not depending on the message path -func (c *Connection) getMessageDataByID(msgID uint16) (typ api.MessageType, name, crc string, err error) { - for _, msgs := range c.msgMapByPath { - if msg, ok := msgs[msgID]; ok { - return msg.GetMessageType(), msg.GetMessageName(), msg.GetCrcString(), nil +// Returns message based on the message ID not depending on message path +func (c *Connection) getMessageByID(msgID uint16) (msg api.Message, err error) { + var ok bool + for _, messages := range c.msgMapByPath { + if msg, ok = messages[msgID]; ok { + return msg, nil } } - return typ, name, crc, fmt.Errorf("unknown message received, ID: %d", msgID) + return nil, fmt.Errorf("unknown message received, ID: %d", msgID) } diff --git a/core/trace.go b/core/trace.go new file mode 100644 index 0000000..ea9a57b --- /dev/null +++ b/core/trace.go @@ -0,0 +1,70 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "git.fd.io/govpp.git/api" + "sort" + "sync" + "sync/atomic" +) + +// trace is the API tracer object synchronizing and keeping recoded messages. +type trace struct { + list []*api.Record + mux *sync.Mutex + + isEnabled int32 +} + +func (c *trace) Enable(enable bool) { + if enable && atomic.CompareAndSwapInt32(&c.isEnabled, 0, 1) { + log.Debugf("API trace enabled") + } else if atomic.CompareAndSwapInt32(&c.isEnabled, 1, 0) { + log.Debugf("API trace disabled") + } +} + +func (c *trace) GetRecords() (list []*api.Record) { + c.mux.Lock() + for _, entry := range c.list { + list = append(list, entry) + } + c.mux.Unlock() + sort.Slice(list, func(i, j int) bool { + return list[i].Timestamp.Before(list[j].Timestamp) + }) + return list +} + +func (c *trace) GetRecordsForChannel(chId uint16) (list []*api.Record) { + c.mux.Lock() + for _, entry := range c.list { + if entry.ChannelID == chId { + list = append(list, entry) + } + } + c.mux.Unlock() + sort.Slice(list, func(i, j int) bool { + return list[i].Timestamp.Before(list[j].Timestamp) + }) + return list +} + +func (c *trace) Clear() { + c.mux.Lock() + c.list = make([]*api.Record, 0) + c.mux.Unlock() +} diff --git a/core/trace_test.go b/core/trace_test.go new file mode 100644 index 0000000..1a29e7a --- /dev/null +++ b/core/trace_test.go @@ -0,0 +1,265 @@ +package core_test + +import ( + "git.fd.io/govpp.git/api" + interfaces "git.fd.io/govpp.git/binapi/interface" + "git.fd.io/govpp.git/binapi/ip" + "git.fd.io/govpp.git/binapi/l2" + "git.fd.io/govpp.git/binapi/memif" + "git.fd.io/govpp.git/binapi/vpe" + "git.fd.io/govpp.git/core" + . "github.com/onsi/gomega" + "strings" + "testing" +) + +func TestTraceEnabled(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + Expect(ctx.conn.Trace()).ToNot(BeNil()) + ctx.conn.Trace().Enable(true) + + request := []api.Message{ + &interfaces.CreateLoopback{}, + &memif.MemifCreate{}, + &l2.BridgeDomainAddDel{}, + &ip.IPTableAddDel{}, + } + reply := []api.Message{ + &interfaces.CreateLoopbackReply{}, + &memif.MemifCreateReply{}, + &l2.BridgeDomainAddDelReply{}, + &ip.IPTableAddDelReply{}, + } + + for i := 0; i < len(request); i++ { + ctx.mockVpp.MockReply(reply[i]) + err := ctx.ch.SendRequest(request[i]).ReceiveReply(reply[i]) + Expect(err).To(BeNil()) + } + + traced := ctx.conn.Trace().GetRecords() + Expect(traced).ToNot(BeNil()) + Expect(traced).To(HaveLen(8)) + for i, entry := range traced { + Expect(entry.Timestamp).ToNot(BeNil()) + Expect(entry.Message.GetMessageName()).ToNot(Equal("")) + if strings.HasSuffix(entry.Message.GetMessageName(), "_reply") || + strings.HasSuffix(entry.Message.GetMessageName(), "_details") { + Expect(entry.IsReceived).To(BeTrue()) + } else { + Expect(entry.IsReceived).To(BeFalse()) + } + if i%2 == 0 { + Expect(request[i/2].GetMessageName()).To(Equal(entry.Message.GetMessageName())) + } else { + Expect(reply[i/2].GetMessageName()).To(Equal(entry.Message.GetMessageName())) + } + } +} + +func TestMultiRequestTraceEnabled(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + ctx.conn.Trace().Enable(true) + + request := []api.Message{ + &interfaces.SwInterfaceDump{}, + } + reply := []api.Message{ + &interfaces.SwInterfaceDetails{ + SwIfIndex: 1, + }, + &interfaces.SwInterfaceDetails{ + SwIfIndex: 2, + }, + &interfaces.SwInterfaceDetails{ + SwIfIndex: 3, + }, + &vpe.ControlPingReply{}, + } + + ctx.mockVpp.MockReply(reply...) + multiCtx := ctx.ch.SendMultiRequest(request[0]) + + i := 0 + for { + last, err := multiCtx.ReceiveReply(reply[i]) + Expect(err).ToNot(HaveOccurred()) + if last { + break + } + i++ + } + + traced := ctx.conn.Trace().GetRecords() + Expect(traced).ToNot(BeNil()) + Expect(traced).To(HaveLen(6)) + for i, entry := range traced { + Expect(entry.Timestamp).ToNot(BeNil()) + Expect(entry.Message.GetMessageName()).ToNot(Equal("")) + if strings.HasSuffix(entry.Message.GetMessageName(), "_reply") || + strings.HasSuffix(entry.Message.GetMessageName(), "_details") { + Expect(entry.IsReceived).To(BeTrue()) + } else { + Expect(entry.IsReceived).To(BeFalse()) + } + if i == 0 { + Expect(request[0].GetMessageName()).To(Equal(entry.Message.GetMessageName())) + } else if i == len(traced)-1 { + msg := vpe.ControlPing{} + Expect(msg.GetMessageName()).To(Equal(entry.Message.GetMessageName())) + } else { + Expect(reply[i-1].GetMessageName()).To(Equal(entry.Message.GetMessageName())) + } + } +} + +func TestTraceDisabled(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + ctx.conn.Trace().Enable(false) + + request := []api.Message{ + &interfaces.CreateLoopback{}, + &memif.MemifCreate{}, + &l2.BridgeDomainAddDel{}, + &ip.IPTableAddDel{}, + } + reply := []api.Message{ + &interfaces.CreateLoopbackReply{}, + &memif.MemifCreateReply{}, + &l2.BridgeDomainAddDelReply{}, + &ip.IPTableAddDelReply{}, + } + + for i := 0; i < len(request); i++ { + ctx.mockVpp.MockReply(reply[i]) + err := ctx.ch.SendRequest(request[i]).ReceiveReply(reply[i]) + Expect(err).To(BeNil()) + } + + traced := ctx.conn.Trace().GetRecords() + Expect(traced).To(BeNil()) +} + +func TestTracePerChannel(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + ctx.conn.Trace().Enable(true) + + ch1 := ctx.ch + ch2, err := ctx.conn.NewAPIChannel() + Expect(err).ToNot(HaveOccurred()) + + requestCh1 := []api.Message{ + &interfaces.CreateLoopback{}, + &memif.MemifCreate{}, + &l2.BridgeDomainAddDel{}, + } + replyCh1 := []api.Message{ + &interfaces.CreateLoopbackReply{}, + &memif.MemifCreateReply{}, + &l2.BridgeDomainAddDelReply{}, + } + requestCh2 := []api.Message{ + &ip.IPTableAddDel{}, + } + replyCh2 := []api.Message{ + &ip.IPTableAddDelReply{}, + } + + for i := 0; i < len(requestCh1); i++ { + ctx.mockVpp.MockReply(replyCh1[i]) + err := ch1.SendRequest(requestCh1[i]).ReceiveReply(replyCh1[i]) + Expect(err).To(BeNil()) + } + for i := 0; i < len(requestCh2); i++ { + ctx.mockVpp.MockReply(replyCh2[i]) + err := ch2.SendRequest(requestCh2[i]).ReceiveReply(replyCh2[i]) + Expect(err).To(BeNil()) + } + + trace := ctx.conn.Trace().GetRecords() + Expect(trace).ToNot(BeNil()) + Expect(trace).To(HaveLen(8)) + + // per channel + channel1, ok := ch1.(*core.Channel) + Expect(ok).To(BeTrue()) + channel2, ok := ch2.(*core.Channel) + Expect(ok).To(BeTrue()) + + tracedCh1 := ctx.conn.Trace().GetRecordsForChannel(channel1.GetID()) + Expect(tracedCh1).ToNot(BeNil()) + Expect(tracedCh1).To(HaveLen(6)) + for i, entry := range tracedCh1 { + Expect(entry.Timestamp).ToNot(BeNil()) + Expect(entry.Message.GetMessageName()).ToNot(Equal("")) + if strings.HasSuffix(entry.Message.GetMessageName(), "_reply") || + strings.HasSuffix(entry.Message.GetMessageName(), "_details") { + Expect(entry.IsReceived).To(BeTrue()) + } else { + Expect(entry.IsReceived).To(BeFalse()) + } + if i%2 == 0 { + Expect(requestCh1[i/2].GetMessageName()).To(Equal(entry.Message.GetMessageName())) + } else { + Expect(replyCh1[i/2].GetMessageName()).To(Equal(entry.Message.GetMessageName())) + } + } + + tracedCh2 := ctx.conn.Trace().GetRecordsForChannel(channel2.GetID()) + Expect(tracedCh2).ToNot(BeNil()) + Expect(tracedCh2).To(HaveLen(2)) + for i, entry := range tracedCh2 { + Expect(entry.Timestamp).ToNot(BeNil()) + Expect(entry.Message.GetMessageName()).ToNot(Equal("")) + if strings.HasSuffix(entry.Message.GetMessageName(), "_reply") || + strings.HasSuffix(entry.Message.GetMessageName(), "_details") { + Expect(entry.IsReceived).To(BeTrue()) + } else { + Expect(entry.IsReceived).To(BeFalse()) + } + if i%2 == 0 { + Expect(requestCh2[i/2].GetMessageName()).To(Equal(entry.Message.GetMessageName())) + } else { + Expect(replyCh2[i/2].GetMessageName()).To(Equal(entry.Message.GetMessageName())) + } + } +} + +func TestTraceClear(t *testing.T) { + ctx := setupTest(t, false) + defer ctx.teardownTest() + + ctx.conn.Trace().Enable(true) + + request := []api.Message{ + &interfaces.CreateLoopback{}, + &memif.MemifCreate{}, + } + reply := []api.Message{ + &interfaces.CreateLoopbackReply{}, + &memif.MemifCreateReply{}, + } + + for i := 0; i < len(request); i++ { + ctx.mockVpp.MockReply(reply[i]) + err := ctx.ch.SendRequest(request[i]).ReceiveReply(reply[i]) + Expect(err).To(BeNil()) + } + + traced := ctx.conn.Trace().GetRecords() + Expect(traced).ToNot(BeNil()) + Expect(traced).To(HaveLen(4)) + + ctx.conn.Trace().Clear() + traced = ctx.conn.Trace().GetRecords() + Expect(traced).To(BeNil()) + Expect(traced).To(BeEmpty()) +} |