summaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorOndrej Fabry <ofabry@cisco.com>2019-02-08 08:38:56 +0100
committerOndrej Fabry <ofabry@cisco.com>2019-02-08 10:04:56 +0100
commitdf05a70f90a1486a86a4156b1b0d68c94f2098b4 (patch)
tree45577756efc2ec766430ed17daa9ddcf9c5709b2 /core
parentfa21c9d726ebb807895a8571af9a16dab5cd8d6e (diff)
Add support for using multiple generated versions
- added CheckCompatibility for checking if given messages are compatible - generating Messages global for easier usage of compatibility check - added ReconnectInterval and MaxReconnectAttempts for reconnecting - added Failed state that is sent after exceeding max reconnect attempts Change-Id: I1062ba453f22657c1a2a31aa64cb103ef1223b0f Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
Diffstat (limited to 'core')
-rw-r--r--core/channel.go10
-rw-r--r--core/connection.go166
-rw-r--r--core/control_ping.go15
-rw-r--r--core/request_handler.go2
4 files changed, 109 insertions, 84 deletions
diff --git a/core/channel.go b/core/channel.go
index 5b69eab..bf27b73 100644
--- a/core/channel.go
+++ b/core/channel.go
@@ -142,10 +142,14 @@ func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
return &multiRequestCtx{ch: ch, seqNum: seqNum}
}
-func getMsgFactory(msg api.Message) func() api.Message {
- return func() api.Message {
- return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
+ for _, msg := range msgs {
+ _, err := ch.msgIdentifier.GetMessageID(msg)
+ if err != nil {
+ return err
+ }
}
+ return nil
}
func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) {
diff --git a/core/connection.go b/core/connection.go
index 08f08f5..14b0af4 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -40,6 +40,8 @@ var (
HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
HealthCheckThreshold = 1 // number of failed health checks until the error is reported
DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP
+ ReconnectInterval = time.Second * 1 // default interval for reconnect attempts
+ MaxReconnectAttempts = 10 // maximum number of reconnect attempts
)
// ConnectionState represents the current state of the connection to VPP.
@@ -51,6 +53,9 @@ const (
// Disconnected represents state in which the connection has been dropped.
Disconnected
+
+ // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
+ Failed
)
// ConnectionEvent is a notification about change in the VPP connection state.
@@ -213,88 +218,11 @@ func (c *Connection) releaseAPIChannel(ch *Channel) {
c.channelsLock.Unlock()
}
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
- if c == nil {
- return 0, errors.New("nil connection passed in")
- }
-
- if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
- return msgID, nil
- }
-
- return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
-}
-
-// LookupByID looks up message name and crc by ID.
-func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
- if c == nil {
- return nil, errors.New("nil connection passed in")
- }
-
- if msg, ok := c.msgMap[msgID]; ok {
- return msg, nil
- }
-
- return nil, fmt.Errorf("unknown message ID: %d", msgID)
-}
-
-// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
-func (c *Connection) retrieveMessageIDs() (err error) {
- t := time.Now()
-
- var addMsg = func(msgID uint16, msg api.Message) {
- c.msgIDs[getMsgNameWithCrc(msg)] = msgID
- c.msgMap[msgID] = msg
- }
-
- msgs := api.GetRegisteredMessages()
-
- for name, msg := range msgs {
- msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
- if err != nil {
- return err
- }
-
- addMsg(msgID, msg)
-
- if msg.GetMessageName() == msgControlPing.GetMessageName() {
- c.pingReqID = msgID
- msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
- } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() {
- c.pingReplyID = msgID
- msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
- }
-
- if debugMsgIDs {
- log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
- }
- }
-
- log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t))
-
- // fallback for control ping when vpe package is not imported
- if c.pingReqID == 0 {
- c.pingReqID, err = c.vppClient.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
- if err != nil {
- return err
- }
- addMsg(c.pingReqID, msgControlPing)
- }
- if c.pingReplyID == 0 {
- c.pingReplyID, err = c.vppClient.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
- if err != nil {
- return err
- }
- addMsg(c.pingReplyID, msgControlPingReply)
- }
-
- return nil
-}
-
// connectLoop attempts to connect to VPP until it succeeds.
// Then it continues with healthCheckLoop.
func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
+ reconnectAttempts := 0
+
// loop until connected
for {
if err := c.vppClient.WaitReady(); err != nil {
@@ -304,9 +232,13 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
// signal connected event
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
break
+ } else if reconnectAttempts < MaxReconnectAttempts {
+ reconnectAttempts++
+ log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err)
+ time.Sleep(ReconnectInterval)
} else {
- log.Errorf("connecting to VPP failed: %v", err)
- time.Sleep(time.Second)
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
+ return
}
}
@@ -405,3 +337,75 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
func getMsgNameWithCrc(x api.Message) string {
return x.GetMessageName() + "_" + x.GetCrcString()
}
+
+func getMsgFactory(msg api.Message) func() api.Message {
+ return func() api.Message {
+ return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ }
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+ if c == nil {
+ return 0, errors.New("nil connection passed in")
+ }
+
+ if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
+ return msgID, nil
+ }
+
+ msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
+ if err != nil {
+ return 0, err
+ }
+
+ c.msgIDs[getMsgNameWithCrc(msg)] = msgID
+ c.msgMap[msgID] = msg
+
+ return msgID, nil
+}
+
+// LookupByID looks up message name and crc by ID.
+func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
+
+ if msg, ok := c.msgMap[msgID]; ok {
+ return msg, nil
+ }
+
+ return nil, fmt.Errorf("unknown message ID: %d", msgID)
+}
+
+// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
+func (c *Connection) retrieveMessageIDs() (err error) {
+ t := time.Now()
+
+ msgs := api.GetRegisteredMessages()
+
+ var n int
+ for name, msg := range msgs {
+ msgID, err := c.GetMessageID(msg)
+ if err != nil {
+ log.Debugf("retrieving msgID for %s failed: %v", name, err)
+ continue
+ }
+ n++
+
+ if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
+ c.pingReqID = msgID
+ msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
+ c.pingReplyID = msgID
+ msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ }
+
+ if debugMsgIDs {
+ log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
+ }
+ }
+ log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t))
+
+ return nil
+}
diff --git a/core/control_ping.go b/core/control_ping.go
index 904068a..cd447b7 100644
--- a/core/control_ping.go
+++ b/core/control_ping.go
@@ -7,6 +7,16 @@ var (
msgControlPingReply api.Message = new(ControlPingReply)
)
+// SetControlPing sets the control ping message used by core.
+func SetControlPing(m api.Message) {
+ msgControlPing = m
+}
+
+// SetControlPingReply sets the control ping reply message used by core.
+func SetControlPingReply(m api.Message) {
+ msgControlPingReply = m
+}
+
type ControlPing struct{}
func (*ControlPing) GetMessageName() string {
@@ -34,3 +44,8 @@ func (*ControlPingReply) GetCrcString() string {
func (*ControlPingReply) GetMessageType() api.MessageType {
return api.ReplyMessage
}
+
+func init() {
+ api.RegisterMessage((*ControlPing)(nil), "ControlPing")
+ api.RegisterMessage((*ControlPingReply)(nil), "ControlPingReply")
+}
diff --git a/core/request_handler.go b/core/request_handler.go
index 4004d15..dc90747 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -91,6 +91,7 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"msg_name": req.msg.GetMessageName(),
"msg_size": len(data),
"seq_num": req.seqNum,
+ "msg_crc": req.msg.GetCrcString(),
}).Debug(" -> Sending a message to VPP.")
}
@@ -163,6 +164,7 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
"channel": chanID,
"is_multi": isMulti,
"seq_num": seqNum,
+ "msg_crc": msg.GetCrcString(),
}).Debug(" <- Received a message from VPP.")
}