From a3bb834db727a3ac9a1ffcfeae9265e5dead851f Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Wed, 15 Aug 2018 12:59:25 +0200 Subject: Refactor GoVPP Squashed commit of the following: commit 348930db31575e9f59b3834d9fec07411f011e05 Author: Ondrej Fabry Date: Wed Aug 15 11:30:13 2018 +0200 Use debug level for log about different context commit 9fc963c559cea67a41b85c6cdadc322fb3b1fc7c Author: Ondrej Fabry Date: Wed Aug 15 11:22:03 2018 +0200 Remove annoying logs and add env vars for debugging commit fdc9fd624d13feadb602e0d03d58f8a44b7a565f Author: Ondrej Fabry Date: Wed Aug 15 11:18:47 2018 +0200 Fix printing unknown VPPApiError commit 8f968be36a91de4d4a8ea17593ba314f82aa9583 Author: Ondrej Fabry Date: Tue Aug 14 17:25:10 2018 +0200 Refactor entire GoVPP - fix some cases with inconsistent VPP messages, causing messages to be incorrectly identified as event or request - simplify API, remove direct access to internal Go channels - add module name with message to registration of messages - start watching filesystem only when vpe-api file does not exist - simplify code in message codec and remove unneeded parts - retrieve IDs of all registered messages after connect to VPP - define fallback for control ping in core to avoid duplicate registration - add SetLogLevel function to set logger level more easily - remove lot of unused code commit 34dd1b7e10ef0324aa8c4e4cc42375bd6021c6cb Author: Ondrej Fabry Date: Mon Aug 13 11:29:54 2018 +0200 Rename VnetError to VPPApiError commit c6549d6f77847a1367a12ff47fb716e2955e973a Author: Ondrej Fabry Date: Mon Aug 13 10:23:43 2018 +0200 Fix examples and regenerate binapi commit 4612e36b416779771f5efab4fc654c2766d2cb1c Author: Ondrej Fabry Date: Mon Aug 13 09:51:22 2018 +0200 Add parsing and generation for services commit ac9c5280c5aa27e325f327609e2364cc66f3723f Author: Ondrej Fabry Date: Fri Aug 10 14:30:15 2018 +0200 Fix exit status on error and add continue-onerror flag commit 9b3f4ef9fc7c8c62037fa107085eae18a8725314 Author: Ondrej Fabry Date: Thu Aug 9 15:20:56 2018 +0200 Return VnetError when Retval != 0 commit 8fd21a907b5e628ec4d2026215b83d15da96c297 Author: Ondrej Fabry Date: Thu Aug 9 14:59:05 2018 +0200 Add all missing errors from api_errno.h commit 08450f288d3046ebaecf40203174ae342a07f1eb Author: Ondrej Fabry Date: Thu Aug 9 14:29:27 2018 +0200 Update README commit d8dced0728dd62243539be741868fb7d9b8de4cc Author: Ondrej Fabry Date: Thu Aug 9 13:59:59 2018 +0200 Regenerate vpe in core commit 254da7592cdbf634cf7aa46ae36ce7bb6d4ee555 Author: Ondrej Fabry Date: Thu Aug 9 13:37:00 2018 +0200 Add VnetError type for Retvals commit 4475c1087fb53ab4c788e530bc7fef7cfc89d2cd Author: Ondrej Fabry Date: Thu Aug 9 13:36:07 2018 +0200 Add registration API commit 892a3ea5a2c703e2f7c29331663f6a6fa706bff5 Author: Ondrej Fabry Date: Thu Aug 9 13:30:43 2018 +0200 Generate registration for messages and check all IDs on connect commit 389ed03b6e7082260281866c3449d72d72347c99 Author: Ondrej Fabry Date: Thu Aug 9 11:32:41 2018 +0200 Show error for empty adapter (on Darwin/Windows) commit ef1ea040d656ade64242432dc5f06810ed8dcde6 Author: Ondrej Fabry Date: Thu Aug 9 11:31:37 2018 +0200 Improve logged info commit d4adae3b14ed54c8d693060dd857fa9ba5ec8e06 Author: Ondrej Fabry Date: Thu Aug 9 11:27:48 2018 +0200 Update examples commit 63921e1346014701a22639a2611129563bb1eb78 Author: Ondrej Fabry Date: Thu Aug 9 11:02:56 2018 +0200 Generate unions and fix some issues - add comments between sections - define structs on single line if it has no fields - generate unions with setters/getters for each field - fix detection of message type commit 6ab3e3fa590b245898306a6ffaf32c7721eab60c Author: Ondrej Fabry Date: Wed Aug 8 15:37:10 2018 +0200 Refactor binapi-generator - split JSON parsing from code generation - parse and generate enums - parse unions (no generation yet) - change output file suffix to '.ba.go' - cleanup and simplify code - split code into files - add flag for debug mode Change-Id: I58f685e0d4c7a38e9a7b6ea0a1f47792d95d7399 Signed-off-by: Ondrej Fabry --- core/connection.go | 289 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 174 insertions(+), 115 deletions(-) (limited to 'core/connection.go') diff --git a/core/connection.go b/core/connection.go index a44d0c4..c77358f 100644 --- a/core/connection.go +++ b/core/connection.go @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api - package core import ( "errors" - "os" + "fmt" + "reflect" "sync" "sync/atomic" "time" @@ -28,115 +27,95 @@ import ( "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/api" "git.fd.io/govpp.git/codec" - "git.fd.io/govpp.git/core/bin_api/vpe" -) - -var ( - msgControlPing api.Message = &vpe.ControlPing{} - msgControlPingReply api.Message = &vpe.ControlPingReply{} ) const ( - requestChannelBufSize = 100 // default size of the request channel buffers - replyChannelBufSize = 100 // default size of the reply channel buffers - notificationChannelBufSize = 100 // default size of the notification channel buffers + requestChannelBufSize = 100 // default size of the request channel buffer + replyChannelBufSize = 100 // default size of the reply channel buffer + notificationChannelBufSize = 100 // default size of the notification channel buffer + + defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout ) var ( - healthCheckProbeInterval = time.Second * 1 // default health check probe interval - healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe - healthCheckThreshold = 1 // number of failed healthProbe until the error is reported + healthCheckInterval = time.Second * 1 // default health check interval + healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check + healthCheckThreshold = 1 // number of failed health checks until the error is reported ) -// ConnectionState holds the current state of the connection to VPP. +// SetHealthCheckProbeInterval sets health check probe interval. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckProbeInterval(interval time.Duration) { + healthCheckInterval = interval +} + +// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. +// If reply arrives after the timeout, check is considered as failed. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckReplyTimeout(timeout time.Duration) { + healthCheckReplyTimeout = timeout +} + +// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckThreshold(threshold int) { + healthCheckThreshold = threshold +} + +// ConnectionState represents the current state of the connection to VPP. type ConnectionState int const ( - // Connected connection state means that the connection to VPP has been successfully established. + // Connected represents state in which the connection has been successfully established. Connected ConnectionState = iota - // Disconnected connection state means that the connection to VPP has been lost. + // Disconnected represents state in which the connection has been dropped. Disconnected ) // ConnectionEvent is a notification about change in the VPP connection state. type ConnectionEvent struct { - // Timestamp holds the time when the event has been generated. + // Timestamp holds the time when the event has been created. Timestamp time.Time - // State holds the new state of the connection to VPP at the time when the event has been generated. + // State holds the new state of the connection at the time when the event has been created. State ConnectionState + + // Error holds error if any encountered. + Error error } +var ( + connLock sync.RWMutex // lock for the global connection + conn *Connection // global handle to the Connection (used in the message receive callback) +) + // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { vpp adapter.VppAdapter // VPP adapter connected uint32 // non-zero if the adapter is connected to VPP - codec *codec.MsgCodec // message codec - msgIDsLock sync.RWMutex // lock for the message IDs map - msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC + codec *codec.MsgCodec // message codec + msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC + msgMap map[uint16]api.Message // map of messages indexed by message ID + maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) channelsLock sync.RWMutex // lock for the channels map channels map[uint16]*channel // map of all API channels indexed by the channel ID notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID - maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) - pingReqID uint16 // ID if the ControlPing message - pingReplyID uint16 // ID of the ControlPingReply message + pingReqID uint16 // ID if the ControlPing message + pingReplyID uint16 // ID of the ControlPingReply message lastReplyLock sync.Mutex // lock for the last reply lastReply time.Time // time of the last received reply from VPP } -var ( - log *logger.Logger // global logger - conn *Connection // global handle to the Connection (used in the message receive callback) - connLock sync.RWMutex // lock for the global connection -) - -// init initializes global logger, which logs debug level messages to stdout. -func init() { - log = logger.New() - log.Out = os.Stdout - log.Level = logger.DebugLevel -} - -// SetLogger sets global logger to provided one. -func SetLogger(l *logger.Logger) { - log = l -} - -// SetHealthCheckProbeInterval sets health check probe interval. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckProbeInterval(interval time.Duration) { - healthCheckProbeInterval = interval -} - -// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. -// If reply arrives after the timeout, check is considered as failed. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckReplyTimeout(timeout time.Duration) { - healthCheckReplyTimeout = timeout -} - -// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckThreshold(threshold int) { - healthCheckThreshold = threshold -} - -// SetControlPingMessages sets the messages for ControlPing and ControlPingReply -func SetControlPingMessages(controPing, controlPingReply api.Message) { - msgControlPing = controPing - msgControlPingReply = controlPingReply -} - // Connect connects to VPP using specified VPP adapter and returns the connection handle. // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { @@ -152,7 +131,7 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { return nil, err } - return conn, nil + return c, nil } // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle @@ -170,7 +149,7 @@ func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEv connChan := make(chan ConnectionEvent, notificationChannelBufSize) go c.connectLoop(connChan) - return conn, connChan, nil + return c, connChan, nil } // Disconnect disconnects from VPP and releases all connection-related resources. @@ -178,10 +157,11 @@ func (c *Connection) Disconnect() { if c == nil { return } + connLock.Lock() defer connLock.Unlock() - if c != nil && c.vpp != nil { + if c.vpp != nil { c.disconnectVPP() } conn = nil @@ -201,41 +181,119 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { codec: &codec.MsgCodec{}, channels: make(map[uint16]*channel), msgIDs: make(map[string]uint16), + msgMap: make(map[uint16]api.Message), notifSubscriptions: make(map[uint16][]*api.NotifSubscription), } + conn.vpp.SetMsgCallback(conn.msgCallback) - conn.vpp.SetMsgCallback(msgCallback) return conn, nil } -// connectVPP performs one blocking attempt to connect to VPP. +// connectVPP performs blocking attempt to connect to VPP. func (c *Connection) connectVPP() error { - log.Debug("Connecting to VPP...") + log.Debug("Connecting to VPP..") // blocking connect - err := c.vpp.Connect() - if err != nil { - log.Warn(err) + if err := c.vpp.Connect(); err != nil { return err } - // store control ping IDs - if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil { - c.vpp.Disconnect() - return err - } - if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil { + log.Debugf("Connected to VPP.") + + if err := c.retrieveMessageIDs(); err != nil { c.vpp.Disconnect() - return err + return fmt.Errorf("VPP is incompatible: %v", err) } // store connected state atomic.StoreUint32(&c.connected, 1) - log.Info("Connected to VPP.") return nil } +func getMsgNameWithCrc(x api.Message) string { + return x.GetMessageName() + "_" + x.GetCrcString() +} + +// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map +func (c *Connection) retrieveMessageIDs() (err error) { + t := time.Now() + + var addMsg = func(msgID uint16, msg api.Message) { + c.msgIDs[getMsgNameWithCrc(msg)] = msgID + c.msgMap[msgID] = msg + } + + msgs := api.GetAllMessages() + + for name, msg := range msgs { + msgID, err := c.vpp.GetMsgID(msg.GetMessageName(), msg.GetCrcString()) + if err != nil { + return err + } + + addMsg(msgID, msg) + + if msg.GetMessageName() == msgControlPing.GetMessageName() { + c.pingReqID = msgID + msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() { + c.pingReplyID = msgID + msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } + + if debugMsgIDs { + log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID) + } + } + + log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t)) + + // fallback for control ping when vpe package is not imported + if c.pingReqID == 0 { + c.pingReqID, err = c.vpp.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString()) + if err != nil { + return err + } + addMsg(c.pingReqID, msgControlPing) + } + if c.pingReplyID == 0 { + c.pingReplyID, err = c.vpp.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString()) + if err != nil { + return err + } + addMsg(c.pingReplyID, msgControlPingReply) + } + + return nil +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { + if c == nil { + return 0, errors.New("nil connection passed in") + } + + if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { + return msgID, nil + } + + return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString()) +} + +// LookupByID looks up message name and crc by ID. +func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + + if msg, ok := c.msgMap[msgID]; ok { + return msg, nil + } + + return nil, fmt.Errorf("unknown message ID: %d", msgID) +} + // disconnectVPP disconnects from VPP in case it is connected. func (c *Connection) disconnectVPP() { if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { @@ -269,19 +327,21 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) { // it continues with connectLoop and tries to reconnect. func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { // create a separate API channel for health check probes - ch, err := conn.newAPIChannelBuffered(1, 1) + ch, err := c.newAPIChannel(1, 1) if err != nil { log.Error("Failed to create health check API channel, health check will be disabled:", err) return } - var sinceLastReply time.Duration - var failedChecks int + var ( + sinceLastReply time.Duration + failedChecks int + ) // send health check probes until an error or timeout occurs for { // sleep until next health check probe period - time.Sleep(healthCheckProbeInterval) + time.Sleep(healthCheckInterval) if atomic.LoadUint32(&c.connected) == 0 { // Disconnect has been called in the meantime, return the healthcheck - reconnect loop @@ -297,22 +357,22 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { } // send the control ping request - ch.reqChan <- &api.VppRequest{Message: msgControlPing} + ch.reqChan <- &vppRequest{msg: msgControlPing} for { // expect response within timeout period select { case vppReply := <-ch.replyChan: - err = vppReply.Error + err = vppReply.err case <-time.After(healthCheckReplyTimeout): err = ErrProbeTimeout // check if time since last reply from any other // channel is less than health check reply timeout - conn.lastReplyLock.Lock() + c.lastReplyLock.Lock() sinceLastReply = time.Since(c.lastReply) - conn.lastReplyLock.Unlock() + c.lastReplyLock.Unlock() if sinceLastReply < healthCheckReplyTimeout { log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) @@ -326,17 +386,18 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { failedChecks++ log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks) if failedChecks > healthCheckThreshold { - // in case of exceeded treshold disconnect + // in case of exceeded failed check treshold, assume VPP disconnected log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold) connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} break } } else if err != nil { - // in case of error disconnect + // in case of error, assume VPP disconnected log.Errorf("VPP health check probe failed: %v", err) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err} break } else if failedChecks > 0 { + // in case of success after failed checks, clear failed check counter failedChecks = 0 log.Infof("VPP health check probe OK") } @@ -351,33 +412,31 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { } func (c *Connection) NewAPIChannel() (api.Channel, error) { - return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize) + return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize) } func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { - return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize) + return c.newAPIChannel(reqChanBufSize, replyChanBufSize) } // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. // It allows to specify custom buffer sizes for the request and reply Go channels. -func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) { +func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) { if c == nil { return nil, errors.New("nil connection passed in") } chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) ch := &channel{ - id: chID, - replyTimeout: defaultReplyTimeout, + id: chID, + replyTimeout: defaultReplyTimeout, + msgDecoder: c.codec, + msgIdentifier: c, + reqChan: make(chan *vppRequest, reqChanBufSize), + replyChan: make(chan *vppReply, replyChanBufSize), + notifSubsChan: make(chan *subscriptionRequest, reqChanBufSize), + notifSubsReplyChan: make(chan error, replyChanBufSize), } - ch.msgDecoder = c.codec - ch.msgIdentifier = c - - // create the communication channels - ch.reqChan = make(chan *api.VppRequest, reqChanBufSize) - ch.replyChan = make(chan *api.VppReply, replyChanBufSize) - ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize) - ch.notifSubsReplyChan = make(chan error, replyChanBufSize) // store API channel within the client c.channelsLock.Lock() @@ -393,8 +452,8 @@ func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) // releaseAPIChannel releases API channel that needs to be closed. func (c *Connection) releaseAPIChannel(ch *channel) { log.WithFields(logger.Fields{ - "ID": ch.id, - }).Debug("API channel closed.") + "channel": ch.id, + }).Debug("API channel released") // delete the channel from channels map c.channelsLock.Lock() -- cgit 1.2.3-korg