From df05a70f90a1486a86a4156b1b0d68c94f2098b4 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Fri, 8 Feb 2019 08:38:56 +0100 Subject: Add support for using multiple generated versions - added CheckCompatibility for checking if given messages are compatible - generating Messages global for easier usage of compatibility check - added ReconnectInterval and MaxReconnectAttempts for reconnecting - added Failed state that is sent after exceeding max reconnect attempts Change-Id: I1062ba453f22657c1a2a31aa64cb103ef1223b0f Signed-off-by: Ondrej Fabry --- core/channel.go | 10 ++- core/connection.go | 166 +++++++++++++++++++++++++----------------------- core/control_ping.go | 15 +++++ core/request_handler.go | 2 + 4 files changed, 109 insertions(+), 84 deletions(-) (limited to 'core') diff --git a/core/channel.go b/core/channel.go index 5b69eab..bf27b73 100644 --- a/core/channel.go +++ b/core/channel.go @@ -142,10 +142,14 @@ func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { return &multiRequestCtx{ch: ch, seqNum: seqNum} } -func getMsgFactory(msg api.Message) func() api.Message { - return func() api.Message { - return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) +func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error { + for _, msg := range msgs { + _, err := ch.msgIdentifier.GetMessageID(msg) + if err != nil { + return err + } } + return nil } func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) { diff --git a/core/connection.go b/core/connection.go index 08f08f5..14b0af4 100644 --- a/core/connection.go +++ b/core/connection.go @@ -40,6 +40,8 @@ var ( HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe HealthCheckThreshold = 1 // number of failed health checks until the error is reported DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP + ReconnectInterval = time.Second * 1 // default interval for reconnect attempts + MaxReconnectAttempts = 10 // maximum number of reconnect attempts ) // ConnectionState represents the current state of the connection to VPP. @@ -51,6 +53,9 @@ const ( // Disconnected represents state in which the connection has been dropped. Disconnected + + // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts. + Failed ) // ConnectionEvent is a notification about change in the VPP connection state. @@ -213,88 +218,11 @@ func (c *Connection) releaseAPIChannel(ch *Channel) { c.channelsLock.Unlock() } -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - - if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { - return msgID, nil - } - - return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString()) -} - -// LookupByID looks up message name and crc by ID. -func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - - if msg, ok := c.msgMap[msgID]; ok { - return msg, nil - } - - return nil, fmt.Errorf("unknown message ID: %d", msgID) -} - -// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map -func (c *Connection) retrieveMessageIDs() (err error) { - t := time.Now() - - var addMsg = func(msgID uint16, msg api.Message) { - c.msgIDs[getMsgNameWithCrc(msg)] = msgID - c.msgMap[msgID] = msg - } - - msgs := api.GetRegisteredMessages() - - for name, msg := range msgs { - msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString()) - if err != nil { - return err - } - - addMsg(msgID, msg) - - if msg.GetMessageName() == msgControlPing.GetMessageName() { - c.pingReqID = msgID - msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() { - c.pingReplyID = msgID - msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } - - if debugMsgIDs { - log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID) - } - } - - log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t)) - - // fallback for control ping when vpe package is not imported - if c.pingReqID == 0 { - c.pingReqID, err = c.vppClient.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString()) - if err != nil { - return err - } - addMsg(c.pingReqID, msgControlPing) - } - if c.pingReplyID == 0 { - c.pingReplyID, err = c.vppClient.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString()) - if err != nil { - return err - } - addMsg(c.pingReplyID, msgControlPingReply) - } - - return nil -} - // connectLoop attempts to connect to VPP until it succeeds. // Then it continues with healthCheckLoop. func (c *Connection) connectLoop(connChan chan ConnectionEvent) { + reconnectAttempts := 0 + // loop until connected for { if err := c.vppClient.WaitReady(); err != nil { @@ -304,9 +232,13 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) { // signal connected event connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} break + } else if reconnectAttempts < MaxReconnectAttempts { + reconnectAttempts++ + log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err) + time.Sleep(ReconnectInterval) } else { - log.Errorf("connecting to VPP failed: %v", err) - time.Sleep(time.Second) + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err} + return } } @@ -405,3 +337,75 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { func getMsgNameWithCrc(x api.Message) string { return x.GetMessageName() + "_" + x.GetCrcString() } + +func getMsgFactory(msg api.Message) func() api.Message { + return func() api.Message { + return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { + if c == nil { + return 0, errors.New("nil connection passed in") + } + + if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { + return msgID, nil + } + + msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString()) + if err != nil { + return 0, err + } + + c.msgIDs[getMsgNameWithCrc(msg)] = msgID + c.msgMap[msgID] = msg + + return msgID, nil +} + +// LookupByID looks up message name and crc by ID. +func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + + if msg, ok := c.msgMap[msgID]; ok { + return msg, nil + } + + return nil, fmt.Errorf("unknown message ID: %d", msgID) +} + +// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map +func (c *Connection) retrieveMessageIDs() (err error) { + t := time.Now() + + msgs := api.GetRegisteredMessages() + + var n int + for name, msg := range msgs { + msgID, err := c.GetMessageID(msg) + if err != nil { + log.Debugf("retrieving msgID for %s failed: %v", name, err) + continue + } + n++ + + if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() { + c.pingReqID = msgID + msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() { + c.pingReplyID = msgID + msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } + + if debugMsgIDs { + log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID) + } + } + log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t)) + + return nil +} diff --git a/core/control_ping.go b/core/control_ping.go index 904068a..cd447b7 100644 --- a/core/control_ping.go +++ b/core/control_ping.go @@ -7,6 +7,16 @@ var ( msgControlPingReply api.Message = new(ControlPingReply) ) +// SetControlPing sets the control ping message used by core. +func SetControlPing(m api.Message) { + msgControlPing = m +} + +// SetControlPingReply sets the control ping reply message used by core. +func SetControlPingReply(m api.Message) { + msgControlPingReply = m +} + type ControlPing struct{} func (*ControlPing) GetMessageName() string { @@ -34,3 +44,8 @@ func (*ControlPingReply) GetCrcString() string { func (*ControlPingReply) GetMessageType() api.MessageType { return api.ReplyMessage } + +func init() { + api.RegisterMessage((*ControlPing)(nil), "ControlPing") + api.RegisterMessage((*ControlPingReply)(nil), "ControlPingReply") +} diff --git a/core/request_handler.go b/core/request_handler.go index 4004d15..dc90747 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -91,6 +91,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { "msg_name": req.msg.GetMessageName(), "msg_size": len(data), "seq_num": req.seqNum, + "msg_crc": req.msg.GetCrcString(), }).Debug(" -> Sending a message to VPP.") } @@ -163,6 +164,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { "channel": chanID, "is_multi": isMulti, "seq_num": seqNum, + "msg_crc": msg.GetCrcString(), }).Debug(" <- Received a message from VPP.") } -- cgit 1.2.3-korg