From 6b350c65fe0ec845cecf58bfb41ffc63dc9c04f7 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Thu, 23 Aug 2018 22:51:56 +0200 Subject: Simplify subscribing to events and fix events - there is no need for sending subscription requests through channels, since all the messages are registered and no communication with VPP is needed Change-Id: Ibc29957be02a32e26309f66c369a071559b822a9 Signed-off-by: Ondrej Fabry --- core/connection.go | 235 ++++++++++++++++++++++++----------------------------- 1 file changed, 104 insertions(+), 131 deletions(-) (limited to 'core/connection.go') diff --git a/core/connection.go b/core/connection.go index c77358f..7d014ce 100644 --- a/core/connection.go +++ b/core/connection.go @@ -29,42 +29,19 @@ import ( "git.fd.io/govpp.git/codec" ) -const ( - requestChannelBufSize = 100 // default size of the request channel buffer - replyChannelBufSize = 100 // default size of the reply channel buffer - notificationChannelBufSize = 100 // default size of the notification channel buffer - - defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout +var ( + RequestChanBufSize = 100 // default size of the request channel buffer + ReplyChanBufSize = 100 // default size of the reply channel buffer + NotificationChanBufSize = 100 // default size of the notification channel buffer ) var ( - healthCheckInterval = time.Second * 1 // default health check interval - healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check - healthCheckThreshold = 1 // number of failed health checks until the error is reported + HealthCheckProbeInterval = time.Second * 1 // default health check probe interval + HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe + HealthCheckThreshold = 1 // number of failed health checks until the error is reported + DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP ) -// SetHealthCheckProbeInterval sets health check probe interval. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckProbeInterval(interval time.Duration) { - healthCheckInterval = interval -} - -// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. -// If reply arrives after the timeout, check is considered as failed. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckReplyTimeout(timeout time.Duration) { - healthCheckReplyTimeout = timeout -} - -// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckThreshold(threshold int) { - healthCheckThreshold = threshold -} - // ConnectionState represents the current state of the connection to VPP. type ConnectionState int @@ -104,10 +81,10 @@ type Connection struct { maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) channelsLock sync.RWMutex // lock for the channels map - channels map[uint16]*channel // map of all API channels indexed by the channel ID + channels map[uint16]*Channel // map of all API channels indexed by the channel ID - notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map - notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID + subscriptionsLock sync.RWMutex // lock for the subscriptions map + subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID pingReqID uint16 // ID if the ControlPing message pingReplyID uint16 // ID of the ControlPingReply message @@ -116,18 +93,30 @@ type Connection struct { lastReply time.Time // time of the last received reply from VPP } +func newConnection(vpp adapter.VppAdapter) *Connection { + c := &Connection{ + vpp: vpp, + codec: &codec.MsgCodec{}, + msgIDs: make(map[string]uint16), + msgMap: make(map[uint16]api.Message), + channels: make(map[uint16]*Channel), + subscriptions: make(map[uint16][]*subscriptionCtx), + } + vpp.SetMsgCallback(c.msgCallback) + return c +} + // Connect connects to VPP using specified VPP adapter and returns the connection handle. // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { // create new connection handle - c, err := newConnection(vppAdapter) + c, err := createConnection(vppAdapter) if err != nil { return nil, err } // blocking attempt to connect to VPP - err = c.connectVPP() - if err != nil { + if err := c.connectVPP(); err != nil { return nil, err } @@ -140,13 +129,13 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { // create new connection handle - c, err := newConnection(vppAdapter) + c, err := createConnection(vppAdapter) if err != nil { return nil, nil, err } // asynchronously attempt to connect to VPP - connChan := make(chan ConnectionEvent, notificationChannelBufSize) + connChan := make(chan ConnectionEvent, NotificationChanBufSize) go c.connectLoop(connChan) return c, connChan, nil @@ -168,7 +157,7 @@ func (c *Connection) Disconnect() { } // newConnection returns new connection handle. -func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { +func createConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { connLock.Lock() defer connLock.Unlock() @@ -176,15 +165,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { return nil, errors.New("only one connection per process is supported") } - conn = &Connection{ - vpp: vppAdapter, - codec: &codec.MsgCodec{}, - channels: make(map[uint16]*channel), - msgIDs: make(map[string]uint16), - msgMap: make(map[uint16]api.Message), - notifSubscriptions: make(map[uint16][]*api.NotifSubscription), - } - conn.vpp.SetMsgCallback(conn.msgCallback) + conn = newConnection(vppAdapter) return conn, nil } @@ -211,8 +192,72 @@ func (c *Connection) connectVPP() error { return nil } -func getMsgNameWithCrc(x api.Message) string { - return x.GetMessageName() + "_" + x.GetCrcString() +func (c *Connection) NewAPIChannel() (api.Channel, error) { + return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize) +} + +func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { + return c.newAPIChannel(reqChanBufSize, replyChanBufSize) +} + +// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. +// It allows to specify custom buffer sizes for the request and reply Go channels. +func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + + // create new channel + chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) + channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize) + + // store API channel within the client + c.channelsLock.Lock() + c.channels[chID] = channel + c.channelsLock.Unlock() + + // start watching on the request channel + go c.watchRequests(channel) + + return channel, nil +} + +// releaseAPIChannel releases API channel that needs to be closed. +func (c *Connection) releaseAPIChannel(ch *Channel) { + log.WithFields(logger.Fields{ + "channel": ch.id, + }).Debug("API channel released") + + // delete the channel from channels map + c.channelsLock.Lock() + delete(c.channels, ch.id) + c.channelsLock.Unlock() +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { + if c == nil { + return 0, errors.New("nil connection passed in") + } + + if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { + return msgID, nil + } + + return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString()) +} + +// LookupByID looks up message name and crc by ID. +func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + + if msg, ok := c.msgMap[msgID]; ok { + return msg, nil + } + + return nil, fmt.Errorf("unknown message ID: %d", msgID) } // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map @@ -268,32 +313,6 @@ func (c *Connection) retrieveMessageIDs() (err error) { return nil } -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - - if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { - return msgID, nil - } - - return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString()) -} - -// LookupByID looks up message name and crc by ID. -func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - - if msg, ok := c.msgMap[msgID]; ok { - return msg, nil - } - - return nil, fmt.Errorf("unknown message ID: %d", msgID) -} - // disconnectVPP disconnects from VPP in case it is connected. func (c *Connection) disconnectVPP() { if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { @@ -341,7 +360,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { // send health check probes until an error or timeout occurs for { // sleep until next health check probe period - time.Sleep(healthCheckInterval) + time.Sleep(HealthCheckProbeInterval) if atomic.LoadUint32(&c.connected) == 0 { // Disconnect has been called in the meantime, return the healthcheck - reconnect loop @@ -365,7 +384,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { case vppReply := <-ch.replyChan: err = vppReply.err - case <-time.After(healthCheckReplyTimeout): + case <-time.After(HealthCheckReplyTimeout): err = ErrProbeTimeout // check if time since last reply from any other @@ -374,7 +393,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { sinceLastReply = time.Since(c.lastReply) c.lastReplyLock.Unlock() - if sinceLastReply < healthCheckReplyTimeout { + if sinceLastReply < HealthCheckReplyTimeout { log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) continue } @@ -384,10 +403,10 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { if err == ErrProbeTimeout { failedChecks++ - log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks) - if failedChecks > healthCheckThreshold { + log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks) + if failedChecks > HealthCheckThreshold { // in case of exceeded failed check treshold, assume VPP disconnected - log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold) + log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold) connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} break } @@ -411,52 +430,6 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { c.connectLoop(connChan) } -func (c *Connection) NewAPIChannel() (api.Channel, error) { - return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize) -} - -func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { - return c.newAPIChannel(reqChanBufSize, replyChanBufSize) -} - -// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. -// It allows to specify custom buffer sizes for the request and reply Go channels. -func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - ch := &channel{ - id: chID, - replyTimeout: defaultReplyTimeout, - msgDecoder: c.codec, - msgIdentifier: c, - reqChan: make(chan *vppRequest, reqChanBufSize), - replyChan: make(chan *vppReply, replyChanBufSize), - notifSubsChan: make(chan *subscriptionRequest, reqChanBufSize), - notifSubsReplyChan: make(chan error, replyChanBufSize), - } - - // store API channel within the client - c.channelsLock.Lock() - c.channels[chID] = ch - c.channelsLock.Unlock() - - // start watching on the request channel - go c.watchRequests(ch) - - return ch, nil -} - -// releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *channel) { - log.WithFields(logger.Fields{ - "channel": ch.id, - }).Debug("API channel released") - - // delete the channel from channels map - c.channelsLock.Lock() - delete(c.channels, ch.id) - c.channelsLock.Unlock() +func getMsgNameWithCrc(x api.Message) string { + return x.GetMessageName() + "_" + x.GetCrcString() } -- cgit 1.2.3-korg