aboutsummaryrefslogtreecommitdiffstats
path: root/core/request_handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/request_handler.go')
-rw-r--r--core/request_handler.go22
1 files changed, 19 insertions, 3 deletions
diff --git a/core/request_handler.go b/core/request_handler.go
index 27ff4fc..8f793f5 100644
--- a/core/request_handler.go
+++ b/core/request_handler.go
@@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"sync/atomic"
+ "time"
logger "github.com/sirupsen/logrus"
@@ -26,6 +27,7 @@ import (
var (
ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
+ ErrProbeTimeout = errors.New("probe reply not received within timeout period")
)
// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
@@ -98,7 +100,16 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
}
// send the request to VPP
- c.vpp.SendMsg(chMeta.id, data)
+ err = c.vpp.SendMsg(chMeta.id, data)
+ if err != nil {
+ err = fmt.Errorf("unable to send the messge: %v", err)
+ log.WithFields(logger.Fields{
+ "context": chMeta.id,
+ "msg_id": msgID,
+ }).Error(err)
+ sendReply(ch, &api.VppReply{Error: err})
+ return err
+ }
if req.Multipart {
// send a control ping to determine end of the multipart response
@@ -166,6 +177,11 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
Data: data,
LastReplyReceived: lastReplyReceived,
})
+
+ // store actual time of this reply
+ conn.lastReplyLock.Lock()
+ conn.lastReply = time.Now()
+ conn.lastReplyLock.Unlock()
}
// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
@@ -174,8 +190,8 @@ func sendReply(ch *api.Channel, reply *api.VppReply) {
select {
case ch.ReplyChan <- reply:
// reply sent successfully
- default:
- // unable to write into the channel without blocking
+ case <-time.After(time.Millisecond * 100):
+ // receiver still not ready
log.WithFields(logger.Fields{
"channel": ch,
"msg_id": reply.MessageID,