aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOndrej Fabry <ofabry@cisco.com>2018-05-31 16:06:41 +0200
committerOndrej Fabry <ofabry@cisco.com>2018-05-31 16:06:41 +0200
commit061e24913927c85cac21c79b206f409592a0f6a9 (patch)
tree2f02b0fb4aa02795081d96b5b8482793796a99f0
parent3da2b73ec5ce77abf796ebeae6dfc149c8d3c698 (diff)
Improve handling of probes on timeouts
Change-Id: If94059586d4be739d6c8ae7843cfaf3bc90a5323 Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
-rw-r--r--core/core.go79
-rw-r--r--core/request_handler.go22
2 files changed, 74 insertions, 27 deletions
diff --git a/core/core.go b/core/core.go
index 6714f6f..4782ba1 100644
--- a/core/core.go
+++ b/core/core.go
@@ -55,7 +55,7 @@ const (
Connected ConnectionState = iota
// Disconnected connection state means that the connection to VPP has been lost.
- Disconnected = iota
+ Disconnected
)
// ConnectionEvent is a notification about change in the VPP connection state.
@@ -85,6 +85,9 @@ type Connection struct {
maxChannelID uint32 // maximum used client ID
pingReqID uint16 // ID if the ControlPing message
pingReplyID uint16 // ID of the ControlPingReply message
+
+ lastReplyLock sync.Mutex // lock for the last reply
+ lastReply time.Time // time of the last received reply from VPP
}
// channelMetadata contains core-local metadata of an API channel.
@@ -271,17 +274,19 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
// it continues with connectLoop and tries to reconnect.
func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
// create a separate API channel for health check probes
- ch, err := conn.NewAPIChannel()
+ ch, err := conn.NewAPIChannelBuffered(1, 1)
if err != nil {
- log.Error("Error by creating health check API channel, health check will be disabled:", err)
+ log.Error("Failed to create health check API channel, health check will be disabled:", err)
return
}
- failedChecks := 0
- // send health check probes until an error occurs
+ var sinceLastReply time.Duration
+ var failedChecks int
+
+ // send health check probes until an error or timeout occurs
for {
- // wait for healthCheckProbeInterval
- <-time.After(healthCheckProbeInterval)
+ // sleep until next health check probe period
+ time.Sleep(healthCheckProbeInterval)
if atomic.LoadUint32(&c.connected) == 0 {
// Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -289,30 +294,56 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
return
}
- // send the control ping
- ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
-
- // expect response within timeout period
+ // try draining probe replies from previous request before sending next one
select {
- case vppReply := <-ch.ReplyChan:
- err = vppReply.Error
- case <-time.After(healthCheckReplyTimeout):
- err = errors.New("probe reply not received within the timeout period")
+ case <-ch.ReplyChan:
+ log.Debug("drained old probe reply from reply channel")
+ default:
}
- if err != nil {
- failedChecks++
- log.Warnf("VPP health check failed (%d. time): %v", failedChecks, err)
- } else {
- failedChecks = 0
+ // send the control ping request
+ ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
+
+ for {
+ // expect response within timeout period
+ select {
+ case vppReply := <-ch.ReplyChan:
+ err = vppReply.Error
+
+ case <-time.After(healthCheckReplyTimeout):
+ err = ErrProbeTimeout
+
+ // check if time since last reply from any other
+ // channel is less than health check reply timeout
+ conn.lastReplyLock.Lock()
+ sinceLastReply = time.Since(c.lastReply)
+ conn.lastReplyLock.Unlock()
+
+ if sinceLastReply < healthCheckReplyTimeout {
+ log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
+ continue
+ }
+ }
+ break
}
- if failedChecks > healthCheckThreshold {
- // in case of error, break & disconnect
- log.Errorf("Number of VPP health check fails exceeded treshold (%d)", healthCheckThreshold, err)
- // signal disconnected event via channel
+ if err == ErrProbeTimeout {
+ failedChecks++
+ log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
+ if failedChecks > healthCheckThreshold {
+ // in case of exceeded treshold disconnect
+ log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+ break
+ }
+ } else if err != nil {
+ // in case of error disconnect
+ log.Errorf("VPP health check probe failed: %v", err)
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
break
+ } else if failedChecks > 0 {
+ failedChecks = 0
+ log.Infof("VPP health check probe OK")
}
}
diff --git a/core/request_handler.go b/core/request_handler.go
index 27ff4fc..8f793f5 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"sync/atomic"
+ "time"
logger "github.com/sirupsen/logrus"
@@ -26,6 +27,7 @@ import (
var (
ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
+ ErrProbeTimeout = errors.New("probe reply not received within timeout period")
)
// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
@@ -98,7 +100,16 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
}
// send the request to VPP
- c.vpp.SendMsg(chMeta.id, data)
+ err = c.vpp.SendMsg(chMeta.id, data)
+ if err != nil {
+ err = fmt.Errorf("unable to send the messge: %v", err)
+ log.WithFields(logger.Fields{
+ "context": chMeta.id,
+ "msg_id": msgID,
+ }).Error(err)
+ sendReply(ch, &api.VppReply{Error: err})
+ return err
+ }
if req.Multipart {
// send a control ping to determine end of the multipart response
@@ -166,6 +177,11 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
Data: data,
LastReplyReceived: lastReplyReceived,
})
+
+ // store actual time of this reply
+ conn.lastReplyLock.Lock()
+ conn.lastReply = time.Now()
+ conn.lastReplyLock.Unlock()
}
// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
@@ -174,8 +190,8 @@ func sendReply(ch *api.Channel, reply *api.VppReply) {
select {
case ch.ReplyChan <- reply:
// reply sent successfully
- default:
- // unable to write into the channel without blocking
+ case <-time.After(time.Millisecond * 100):
+ // receiver still not ready
log.WithFields(logger.Fields{
"channel": ch,
"msg_id": reply.MessageID,