summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/connection.go44
1 files changed, 30 insertions, 14 deletions
diff --git a/core/connection.go b/core/connection.go
index cfa94ee..53a9acf 100644
--- a/core/connection.go
+++ b/core/connection.go
@@ -54,7 +54,11 @@ const (
// Connected represents state in which the connection has been successfully established.
Connected ConnectionState = iota
- // Disconnected represents state in which the connection has been dropped.
+ // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay,
+ // or not at all. GoVPP treats this state internally the same as disconnected.
+ NotResponding
+
+ // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped.
Disconnected
// Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
@@ -65,6 +69,8 @@ func (s ConnectionState) String() string {
switch s {
case Connected:
return "Connected"
+ case NotResponding:
+ return "NotResponding"
case Disconnected:
return "Disconnected"
case Failed:
@@ -95,6 +101,8 @@ type Connection struct {
vppConnected uint32 // non-zero if the adapter is connected to VPP
+ connChan chan ConnectionEvent // connection status events are sent to this channel
+
codec MessageCodec // message codec
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
msgMap map[uint16]api.Message // map of messages indexed by message ID
@@ -128,6 +136,7 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration)
vppClient: binapi,
maxAttempts: attempts,
recInterval: interval,
+ connChan: make(chan ConnectionEvent, NotificationChanBufSize),
codec: codec.DefaultCodec,
msgIDs: make(map[string]uint16),
msgMap: make(map[uint16]api.Message),
@@ -164,10 +173,9 @@ func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (
c := newConnection(binapi, attempts, interval)
// asynchronously attempt to connect to VPP
- connChan := make(chan ConnectionEvent, NotificationChanBufSize)
- go c.connectLoop(connChan)
+ go c.connectLoop()
- return c, connChan, nil
+ return c, c.connChan, nil
}
// connectVPP performs blocking attempt to connect to VPP.
@@ -259,7 +267,7 @@ func (c *Connection) releaseAPIChannel(ch *Channel) {
// connectLoop attempts to connect to VPP until it succeeds.
// Then it continues with healthCheckLoop.
-func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
+func (c *Connection) connectLoop() {
var reconnectAttempts int
// loop until connected
@@ -269,25 +277,25 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
}
if err := c.connectVPP(); err == nil {
// signal connected event
- connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
+ c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
break
} else if reconnectAttempts < c.maxAttempts {
reconnectAttempts++
log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
time.Sleep(c.recInterval)
} else {
- connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
+ c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
return
}
}
// we are now connected, continue with health check loop
- c.healthCheckLoop(connChan)
+ c.healthCheckLoop()
}
// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
// it continues with connectLoop and tries to reconnect.
-func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
+func (c *Connection) healthCheckLoop() {
// create a separate API channel for health check probes
ch, err := c.newAPIChannel(1, 1)
if err != nil {
@@ -348,15 +356,15 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
failedChecks++
log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
if failedChecks > HealthCheckThreshold {
- // in case of exceeded failed check treshold, assume VPP disconnected
- log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
- connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+ // in case of exceeded failed check threshold, assume VPP unresponsive
+ log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
+ c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
break
}
} else if err != nil {
// in case of error, assume VPP disconnected
log.Errorf("VPP health check probe failed: %v", err)
- connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
+ c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
break
} else if failedChecks > 0 {
// in case of success after failed checks, clear failed check counter
@@ -370,7 +378,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
c.disconnectVPP()
// we are now disconnected, start connect loop
- c.connectLoop(connChan)
+ c.connectLoop()
}
func getMsgNameWithCrc(x api.Message) string {
@@ -458,3 +466,11 @@ func (c *Connection) retrieveMessageIDs() (err error) {
return nil
}
+
+func (c *Connection) sendConnEvent(event ConnectionEvent) {
+ select {
+ case c.connChan <- event:
+ default:
+ log.Warn("Connection state channel is full, discarding value.")
+ }
+}