diff options
Diffstat (limited to 'core/core.go')
-rw-r--r-- | core/core.go | 365 |
1 files changed, 164 insertions, 201 deletions
diff --git a/core/core.go b/core/core.go index 5b14a42..2484c81 100644 --- a/core/core.go +++ b/core/core.go @@ -14,14 +14,12 @@ package core -//go:generate binapi-generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api - import ( "errors" - "fmt" "os" "sync" "sync/atomic" + "time" logger "github.com/Sirupsen/logrus" @@ -31,14 +29,41 @@ import ( ) const ( - requestChannelBufSize = 100 // default size of the request channel buffers - replyChannelBufSize = 100 // default size of the reply channel buffers + requestChannelBufSize = 100 // default size of the request channel buffers + replyChannelBufSize = 100 // default size of the reply channel buffers + notificationChannelBufSize = 100 // default size of the notification channel buffers +) + +const ( + healthCheckProbeInterval = time.Second * 1 // default health check probe interval + healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe +) + +// ConnectionState holds the current state of the connection to VPP. +type ConnectionState int + +const ( + // Connected connection state means that the connection to VPP has been successfully established. + Connected ConnectionState = iota + + // Disconnected connection state means that the connection to VPP has been lost. + Disconnected = iota ) +// ConnectionEvent is a notification about change in the VPP connection state. +type ConnectionEvent struct { + // Timestamp holds the time when the event has been generated. + Timestamp time.Time + + // State holds the new state of the connection to VPP at the time when the event has been generated. + State ConnectionState +} + // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { - vpp adapter.VppAdapter // VPP adapter - codec *MsgCodec // message codec + vpp adapter.VppAdapter // VPP adapter + connected uint32 // non-zero if the adapter is connected to VPP + codec *MsgCodec // message codec msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgIDsLock sync.RWMutex // lock for the message IDs map @@ -79,7 +104,57 @@ func SetLogger(l *logger.Logger) { } // Connect connects to VPP using specified VPP adapter and returns the connection handle. +// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, err + } + + // blocking attempt to connect to VPP + err = c.connectVPP() + if err != nil { + return nil, err + } + + return conn, nil +} + +// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle +// and ConnectionState channel. This call does not block until connection is established, it +// returns immediately. The caller is supposed to watch the returned ConnectionState channel for +// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. +func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, nil, err + } + + // asynchronously attempt to connect to VPP + connChan := make(chan ConnectionEvent, notificationChannelBufSize) + go c.connectLoop(connChan) + + return conn, connChan, nil +} + +// Disconnect disconnects from VPP and releases all connection-related resources. +func (c *Connection) Disconnect() { + if c == nil { + return + } + connLock.Lock() + defer connLock.Unlock() + + if c != nil && c.vpp != nil { + c.disconnectVPP() + } + conn = nil +} + +// newConnection returns new connection handle. +func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { connLock.Lock() defer connLock.Unlock() @@ -93,35 +168,102 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription) conn.vpp.SetMsgCallback(msgCallback) + return conn, nil +} - logger.Debug("Connecting to VPP...") +// connectVPP performs one blocking attempt to connect to VPP. +func (c *Connection) connectVPP() error { + log.Debug("Connecting to VPP...") - err := conn.vpp.Connect() + // blocking connect + err := c.vpp.Connect() if err != nil { - return nil, err + log.Warn(err) + return err } + // store connected state + atomic.StoreUint32(&c.connected, 1) + // store control ping IDs - conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{}) - conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{}) + c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{}) + c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{}) - logger.Debug("VPP connected.") + log.Info("Connected to VPP.") + return nil +} - return conn, nil +// disconnectVPP disconnects from VPP in case it is connected. +func (c *Connection) disconnectVPP() { + if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { + c.vpp.Disconnect() + } } -// Disconnect disconnects from VPP. -func (c *Connection) Disconnect() { - if c == nil { +// connectLoop attempts to connect to VPP until it succeeds. +// Then it continues with healthCheckLoop. +func (c *Connection) connectLoop(connChan chan ConnectionEvent) { + // loop until connected + for { + err := c.connectVPP() + if err == nil { + // signal connected event + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + break + } + } + + // we are now connected, continue with health check loop + c.healthCheckLoop(connChan) +} + +// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, +// it continues with connectLoop and tries to reconnect. +func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { + // create a separate API channel for health check probes + ch, err := conn.NewAPIChannel() + if err != nil { + log.Error("Error by creating health check API channel, health check will be disabled:", err) return } - connLock.Lock() - defer connLock.Unlock() - if c != nil && c.vpp != nil { - c.vpp.Disconnect() + // send health check probes until an error occurs + for { + // wait for healthCheckProbeInterval + <-time.After(healthCheckProbeInterval) + + if atomic.LoadUint32(&c.connected) == 0 { + // Disconnect has been called in the meantime, return the healthcheck - reconnect loop + log.Debug("Disconnected on request, exiting health check loop.") + return + } + + // send the control ping + ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} + + // expect response within timeout period + select { + case vppReply := <-ch.ReplyChan: + err = vppReply.Error + case <-time.After(healthCheckReplyTimeout): + err = errors.New("probe reply not received within the timeout period") + } + + // in case of error, break & disconnect + if err != nil { + log.Errorf("VPP health check failed: %v", err) + // signal disconnected event via channel + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + break + } } - conn = nil + + // cleanup + ch.Close() + c.disconnectVPP() + + // we are now disconnected, start connect loop + c.connectLoop(connChan) } // NewAPIChannel returns a new API channel for communication with VPP via govpp core. @@ -163,84 +305,6 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) return ch, nil } -// watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { - for { - select { - case req, ok := <-ch.ReqChan: - // new request on the request channel - if !ok { - // after closing the request channel, release API channel and return - c.releaseAPIChannel(ch, chMeta) - return - } - c.processRequest(ch, chMeta, req) - - case req := <-ch.NotifSubsChan: - // new request on the notification subscribe channel - c.processNotifSubscribeRequest(ch, req) - } - } -} - -// processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { - // retrieve message ID - msgID, err := c.GetMessageID(req.Message) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": req.Message.GetMessageName(), - "msg_crc": req.Message.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // encode the message into binary - data, err := c.codec.EncodeMsg(req.Message, msgID) - if err != nil { - error := fmt.Errorf("unable to encode the messge: %v", err) - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - }).Errorf("%v", error) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // send the message - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Sending a message to VPP.") - - if req.Multipart { - // expect multipart response - atomic.StoreUint32(&chMeta.multipart, 1) - } - - // send the request to VPP - c.vpp.SendMsg(chMeta.id, data) - - if req.Multipart { - // send a control ping to determine end of the multipart response - ping := &vpe.ControlPing{} - pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID) - - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": c.pingReqID, - "msg_size": len(pingData), - }).Debug("Sending a control ping to VPP.") - - c.vpp.SendMsg(chMeta.id, pingData) - } - - return nil -} - // releaseAPIChannel releases API channel that needs to be closed. func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) { log.WithFields(logger.Fields{ @@ -252,104 +316,3 @@ func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) delete(c.channels, chMeta.id) c.channelsLock.Unlock() } - -// msgCallback is called whenever any binary API message comes from VPP. -func msgCallback(context uint32, msgID uint16, data []byte) { - connLock.RLock() - defer connLock.RUnlock() - - if conn == nil { - log.Warn("Already disconnected, ignoring the message.") - return - } - - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Received a message from VPP.") - - if context == 0 || conn.isNotificationMessage(msgID) { - // process the message as a notification - conn.sendNotifications(msgID, data) - return - } - - // match ch according to the context - conn.channelsLock.RLock() - ch, ok := conn.channels[context] - conn.channelsLock.RUnlock() - - if !ok { - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - }).Error("Context ID not known, ignoring the message.") - return - } - - chMeta := ch.Metadata().(*channelMetadata) - lastReplyReceived := false - // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply - if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { - lastReplyReceived = true - } - - // send the data to the channel - sendReply(ch, &api.VppReply{ - MessageID: msgID, - Data: data, - LastReplyReceived: lastReplyReceived, - }) -} - -// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise -// it logs the error and do not send the message. -func sendReply(ch *api.Channel, reply *api.VppReply) { - select { - case ch.ReplyChan <- reply: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch, - "msg_id": reply.MessageID, - }).Warn("Unable to send the reply, reciever end not ready.") - } -} - -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString()) -} - -// messageNameToID returns message ID of a message identified by its name and CRC. -func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) { - // try to get the ID from the map - c.msgIDsLock.RLock() - id, ok := c.msgIDs[msgName+msgCrc] - c.msgIDsLock.RUnlock() - if ok { - return id, nil - } - - // get the ID using VPP API - id, err := c.vpp.GetMsgID(msgName, msgCrc) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_crc": msgCrc, - }).Errorf("unable to retrieve message ID: %v", err) - return id, error - } - - c.msgIDsLock.Lock() - c.msgIDs[msgName+msgCrc] = id - c.msgIDsLock.Unlock() - - return id, nil -} |