From 061e24913927c85cac21c79b206f409592a0f6a9 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Thu, 31 May 2018 16:06:41 +0200 Subject: Improve handling of probes on timeouts Change-Id: If94059586d4be739d6c8ae7843cfaf3bc90a5323 Signed-off-by: Ondrej Fabry --- core/core.go | 79 ++++++++++++++++++++++++++++++++++--------------- core/request_handler.go | 22 ++++++++++++-- 2 files changed, 74 insertions(+), 27 deletions(-) diff --git a/core/core.go b/core/core.go index 6714f6f..4782ba1 100644 --- a/core/core.go +++ b/core/core.go @@ -55,7 +55,7 @@ const ( Connected ConnectionState = iota // Disconnected connection state means that the connection to VPP has been lost. - Disconnected = iota + Disconnected ) // ConnectionEvent is a notification about change in the VPP connection state. @@ -85,6 +85,9 @@ type Connection struct { maxChannelID uint32 // maximum used client ID pingReqID uint16 // ID if the ControlPing message pingReplyID uint16 // ID of the ControlPingReply message + + lastReplyLock sync.Mutex // lock for the last reply + lastReply time.Time // time of the last received reply from VPP } // channelMetadata contains core-local metadata of an API channel. @@ -271,17 +274,19 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) { // it continues with connectLoop and tries to reconnect. func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { // create a separate API channel for health check probes - ch, err := conn.NewAPIChannel() + ch, err := conn.NewAPIChannelBuffered(1, 1) if err != nil { - log.Error("Error by creating health check API channel, health check will be disabled:", err) + log.Error("Failed to create health check API channel, health check will be disabled:", err) return } - failedChecks := 0 - // send health check probes until an error occurs + var sinceLastReply time.Duration + var failedChecks int + + // send health check probes until an error or timeout occurs for { - // wait for healthCheckProbeInterval - <-time.After(healthCheckProbeInterval) + // sleep until next health check probe period + time.Sleep(healthCheckProbeInterval) if atomic.LoadUint32(&c.connected) == 0 { // Disconnect has been called in the meantime, return the healthcheck - reconnect loop @@ -289,30 +294,56 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { return } - // send the control ping - ch.ReqChan <- &api.VppRequest{Message: msgControlPing} - - // expect response within timeout period + // try draining probe replies from previous request before sending next one select { - case vppReply := <-ch.ReplyChan: - err = vppReply.Error - case <-time.After(healthCheckReplyTimeout): - err = errors.New("probe reply not received within the timeout period") + case <-ch.ReplyChan: + log.Debug("drained old probe reply from reply channel") + default: } - if err != nil { - failedChecks++ - log.Warnf("VPP health check failed (%d. time): %v", failedChecks, err) - } else { - failedChecks = 0 + // send the control ping request + ch.ReqChan <- &api.VppRequest{Message: msgControlPing} + + for { + // expect response within timeout period + select { + case vppReply := <-ch.ReplyChan: + err = vppReply.Error + + case <-time.After(healthCheckReplyTimeout): + err = ErrProbeTimeout + + // check if time since last reply from any other + // channel is less than health check reply timeout + conn.lastReplyLock.Lock() + sinceLastReply = time.Since(c.lastReply) + conn.lastReplyLock.Unlock() + + if sinceLastReply < healthCheckReplyTimeout { + log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) + continue + } + } + break } - if failedChecks > healthCheckThreshold { - // in case of error, break & disconnect - log.Errorf("Number of VPP health check fails exceeded treshold (%d)", healthCheckThreshold, err) - // signal disconnected event via channel + if err == ErrProbeTimeout { + failedChecks++ + log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks) + if failedChecks > healthCheckThreshold { + // in case of exceeded treshold disconnect + log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold) + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + break + } + } else if err != nil { + // in case of error disconnect + log.Errorf("VPP health check probe failed: %v", err) connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} break + } else if failedChecks > 0 { + failedChecks = 0 + log.Infof("VPP health check probe OK") } } diff --git a/core/request_handler.go b/core/request_handler.go index 27ff4fc..8f793f5 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "sync/atomic" + "time" logger "github.com/sirupsen/logrus" @@ -26,6 +27,7 @@ import ( var ( ErrNotConnected = errors.New("not connected to VPP, ignoring the request") + ErrProbeTimeout = errors.New("probe reply not received within timeout period") ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. @@ -98,7 +100,16 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re } // send the request to VPP - c.vpp.SendMsg(chMeta.id, data) + err = c.vpp.SendMsg(chMeta.id, data) + if err != nil { + err = fmt.Errorf("unable to send the messge: %v", err) + log.WithFields(logger.Fields{ + "context": chMeta.id, + "msg_id": msgID, + }).Error(err) + sendReply(ch, &api.VppReply{Error: err}) + return err + } if req.Multipart { // send a control ping to determine end of the multipart response @@ -166,6 +177,11 @@ func msgCallback(context uint32, msgID uint16, data []byte) { Data: data, LastReplyReceived: lastReplyReceived, }) + + // store actual time of this reply + conn.lastReplyLock.Lock() + conn.lastReply = time.Now() + conn.lastReplyLock.Unlock() } // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise @@ -174,8 +190,8 @@ func sendReply(ch *api.Channel, reply *api.VppReply) { select { case ch.ReplyChan <- reply: // reply sent successfully - default: - // unable to write into the channel without blocking + case <-time.After(time.Millisecond * 100): + // receiver still not ready log.WithFields(logger.Fields{ "channel": ch, "msg_id": reply.MessageID, -- cgit 1.2.3-korg