aboutsummaryrefslogtreecommitdiffstats
path: root/core/connection.go
blob: 2c05333cbc0613745c58415e387aa5fdca5b9ee2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
// Copyright (c) 2017 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
	"errors"
	"fmt"
	"path"
	"reflect"
	"sync"
	"sync/atomic"
	"time"

	logger "github.com/sirupsen/logrus"

	"go.fd.io/govpp/adapter"
	"go.fd.io/govpp/api"
	"go.fd.io/govpp/codec"
)

const (
	DefaultReconnectInterval    = time.Second / 2 // default interval between reconnect attempts
	DefaultMaxReconnectAttempts = 3               // default maximum number of reconnect attempts
)

var (
	RequestChanBufSize      = 100 // default size of the request channel buffer
	ReplyChanBufSize        = 100 // default size of the reply channel buffer
	NotificationChanBufSize = 100 // default size of the notification channel buffer
)

var (
	HealthCheckProbeInterval = time.Second            // default health check probe interval
	HealthCheckReplyTimeout  = time.Millisecond * 250 // timeout for reply to a health check probe
	HealthCheckThreshold     = 2                      // number of failed health checks until the error is reported
	DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
)

// ConnectionState represents the current state of the connection to VPP.
type ConnectionState int

const (
	// Connected represents state in which the connection has been successfully established.
	Connected ConnectionState = iota

	// NotResponding represents a state where the VPP socket accepts messages but replies are received with delay,
	// or not at all. GoVPP treats this state internally the same as disconnected.
	NotResponding

	// Disconnected represents state in which the VPP socket is closed and the connection is considered dropped.
	Disconnected

	// Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
	Failed
)

func (s ConnectionState) String() string {
	switch s {
	case Connected:
		return "Connected"
	case NotResponding:
		return "NotResponding"
	case Disconnected:
		return "Disconnected"
	case Failed:
		return "Failed"
	default:
		return fmt.Sprintf("UnknownState(%d)", s)
	}
}

// ConnectionEvent is a notification about change in the VPP connection state.
type ConnectionEvent struct {
	// Timestamp holds the time when the event has been created.
	Timestamp time.Time

	// State holds the new state of the connection at the time when the event has been created.
	State ConnectionState

	// Error holds error if any encountered.
	Error error
}

// Connection represents a shared memory connection to VPP via vppAdapter.
type Connection struct {
	vppClient adapter.VppAPI // VPP binary API client

	maxAttempts int           // interval for reconnect attempts
	recInterval time.Duration // maximum number of reconnect attempts

	vppConnected uint32 // non-zero if the adapter is connected to VPP

	connChan        chan ConnectionEvent // connection status events are sent to this channel
	healthCheckDone chan struct{}        // used to terminate health check loop

	codec        MessageCodec                      // message codec
	msgIDs       map[string]uint16                 // map of message IDs indexed by message name + CRC
	msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path

	channelsLock  sync.RWMutex        // lock for the channels map and the channel ID
	nextChannelID uint16              // next potential channel ID (the real limit is 2^15)
	channels      map[uint16]*Channel // map of all API channels indexed by the channel ID

	subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
	subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID

	pingReqID   uint16 // ID if the ControlPing message
	pingReplyID uint16 // ID of the ControlPingReply message

	lastReplyLock sync.Mutex // lock for the last reply
	lastReply     time.Time  // time of the last received reply from VPP

	msgControlPing      api.Message
	msgControlPingReply api.Message

	apiTrace *trace // API tracer (disabled by default)
}

func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
	if attempts == 0 {
		attempts = DefaultMaxReconnectAttempts
	}
	if interval == 0 {
		interval = DefaultReconnectInterval
	}

	c := &Connection{
		vppClient:           binapi,
		maxAttempts:         attempts,
		recInterval:         interval,
		connChan:            make(chan ConnectionEvent, NotificationChanBufSize),
		healthCheckDone:     make(chan struct{}),
		codec:               codec.DefaultCodec,
		msgIDs:              make(map[string]uint16),
		msgMapByPath:        make(map[string]map[uint16]api.Message),
		channels:            make(map[uint16]*Channel),
		subscriptions:       make(map[uint16][]*subscriptionCtx),
		msgControlPing:      msgControlPing,
		msgControlPingReply: msgControlPingReply,
		apiTrace: &trace{
			list: make([]*api.Record, 0),
			mux:  &sync.Mutex{},
		},
	}
	binapi.SetMsgCallback(c.msgCallback)
	return c
}

// Connect connects to VPP API using specified adapter and returns a connection handle.
// This call blocks until it is either connected, or an error occurs.
// Only one connection attempt will be performed.
func Connect(binapi adapter.VppAPI) (*Connection, error) {
	// create new connection handle
	c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)

	// blocking attempt to connect to VPP
	if err := c.connectVPP(); err != nil {
		return nil, err
	}

	return c, nil
}

// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
// and ConnectionState channel. This call does not block until connection is established, it
// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
	// create new connection handle
	c := newConnection(binapi, attempts, interval)

	// asynchronously attempt to connect to VPP
	go c.connectLoop()

	return c, c.connChan, nil
}

// connectVPP performs blocking attempt to connect to VPP.
func (c *Connection) connectVPP() error {
	log.Debug("Connecting to VPP..")

	// blocking connect
	if err := c.vppClient.Connect(); err != nil {
		return err
	}
	log.Debugf("Connected to VPP")

	if err := c.retrieveMessageIDs(); err != nil {
		if err := c.vppClient.Disconnect(); err != nil {
			log.Debugf("disconnecting vpp client failed: %v", err)
		}
		return fmt.Errorf("VPP is incompatible: %v", err)
	}

	// store connected state
	atomic.StoreUint32(&c.vppConnected, 1)

	return nil
}

// Disconnect disconnects from VPP API and releases all connection-related resources.
func (c *Connection) Disconnect() {
	if c == nil {
		return
	}
	if c.vppClient != nil {
		c.disconnectVPP(true)
	}
}

// disconnectVPP disconnects from VPP in case it is connected. terminate tells
// that disconnectVPP() was called from Close(), so healthCheckLoop() can be
// terminated.
func (c *Connection) disconnectVPP(terminate bool) {
	if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
		if terminate {
			close(c.healthCheckDone)
		}
		log.Debug("Disconnecting from VPP..")

		if err := c.vppClient.Disconnect(); err != nil {
			log.Debugf("Disconnect from VPP failed: %v", err)
		}
		log.Debug("Disconnected from VPP")
	}
}

func (c *Connection) NewAPIChannel() (api.Channel, error) {
	return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
}

func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
	return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
}

// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
// It allows to specify custom buffer sizes for the request and reply Go channels.
func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
	if c == nil {
		return nil, errors.New("nil connection passed in")
	}

	channel, err := c.newChannel(reqChanBufSize, replyChanBufSize)
	if err != nil {
		return nil, err
	}

	// start watching on the request channel
	go c.watchRequests(channel)

	return channel, nil
}

// releaseAPIChannel releases API channel that needs to be closed.
func (c *Connection) releaseAPIChannel(ch *Channel) {
	log.WithFields(logger.Fields{
		"channel": ch.id,
	}).Debug("API channel released")

	// delete the channel from channels map
	c.channelsLock.Lock()
	delete(c.channels, ch.id)
	c.channelsLock.Unlock()
}

// connectLoop attempts to connect to VPP until it succeeds.
// Then it continues with healthCheckLoop.
func (c *Connection) connectLoop() {
	var reconnectAttempts int

	// loop until connected
	for {
		if err := c.vppClient.WaitReady(); err != nil {
			log.Debugf("wait ready failed: %v", err)
		}
		if err := c.connectVPP(); err == nil {
			// signal connected event
			c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
			break
		} else if reconnectAttempts < c.maxAttempts {
			reconnectAttempts++
			log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
			time.Sleep(c.recInterval)
		} else {
			c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
			return
		}
	}

	// we are now connected, continue with health check loop
	c.healthCheckLoop()
}

// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
// it continues with connectLoop and tries to reconnect.
func (c *Connection) healthCheckLoop() {
	// create a separate API channel for health check probes
	ch, err := c.newAPIChannel(1, 1)
	if err != nil {
		log.Error("Failed to create health check API channel, health check will be disabled:", err)
		return
	}
	defer ch.Close()

	var (
		sinceLastReply time.Duration
		failedChecks   int
	)

	// send health check probes until an error or timeout occurs
	probeInterval := time.NewTicker(HealthCheckProbeInterval)
	defer probeInterval.Stop()

HealthCheck:
	for {
		select {
		case <-c.healthCheckDone:
			// Terminate the health check loop on connection disconnect
			log.Debug("Disconnected on request, exiting health check loop.")
			return
		case <-probeInterval.C:
			// try draining probe replies from previous request before sending next one
			select {
			case <-ch.replyChan:
				log.Debug("drained old probe reply from reply channel")
			default:
			}

			// send the control ping request
			ch.reqChan <- &vppRequest{msg: c.msgControlPing}

			for {
				// expect response within timeout period
				select {
				case vppReply := <-ch.replyChan:
					err = vppReply.err

				case <-time.After(HealthCheckReplyTimeout):
					err = ErrProbeTimeout

					// check if time since last reply from any other
					// channel is less than health check reply timeout
					c.lastReplyLock.Lock()
					sinceLastReply = time.Since(c.lastReply)
					c.lastReplyLock.Unlock()

					if sinceLastReply < HealthCheckReplyTimeout {
						log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
						continue
					}
				}
				break
			}

			if err == ErrProbeTimeout {
				failedChecks++
				log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
				if failedChecks > HealthCheckThreshold {
					// in case of exceeded failed check threshold, assume VPP unresponsive
					log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
					c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
					break HealthCheck
				}
			} else if err != nil {
				// in case of error, assume VPP disconnected
				log.Errorf("VPP health check probe failed: %v", err)
				c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
				break HealthCheck
			} else if failedChecks > 0 {
				// in case of success after failed checks, clear failed check counter
				failedChecks = 0
				log.Infof("VPP health check probe OK")
			}
		}
	}

	// cleanup
	c.disconnectVPP(false)

	// we are now disconnected, start connect loop
	c.connectLoop()
}

func getMsgNameWithCrc(x api.Message) string {
	return getMsgID(x.GetMessageName(), x.GetCrcString())
}

func getMsgID(name, crc string) string {
	return name + "_" + crc
}

func getMsgFactory(msg api.Message) func() api.Message {
	return func() api.Message {
		return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
	}
}

// GetMessageID returns message identifier of given API message.
func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
	if c == nil {
		return 0, errors.New("nil connection passed in")
	}
	pkgPath := c.GetMessagePath(msg)
	msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
	if err != nil {
		return 0, err
	}
	if pathMsgs, pathOk := c.msgMapByPath[pkgPath]; !pathOk {
		c.msgMapByPath[pkgPath] = make(map[uint16]api.Message)
		c.msgMapByPath[pkgPath][msgID] = msg
	} else if _, msgOk := pathMsgs[msgID]; !msgOk {
		c.msgMapByPath[pkgPath][msgID] = msg
	}
	if _, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
		return msgID, nil
	}
	c.msgIDs[getMsgNameWithCrc(msg)] = msgID
	return msgID, nil
}

// LookupByID looks up message name and crc by ID.
func (c *Connection) LookupByID(path string, msgID uint16) (api.Message, error) {
	if c == nil {
		return nil, errors.New("nil connection passed in")
	}
	if msg, ok := c.msgMapByPath[path][msgID]; ok {
		return msg, nil
	}
	return nil, fmt.Errorf("unknown message ID %d for path '%s'", msgID, path)
}

// GetMessagePath returns path for the given message
func (c *Connection) GetMessagePath(msg api.Message) string {
	return path.Dir(reflect.TypeOf(msg).Elem().PkgPath())
}

// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
func (c *Connection) retrieveMessageIDs() (err error) {
	t := time.Now()

	msgsByPath := api.GetRegisteredMessages()

	var n int
	for pkgPath, msgs := range msgsByPath {
		for _, msg := range msgs {
			msgID, err := c.GetMessageID(msg)
			if err != nil {
				if debugMsgIDs {
					log.Debugf("retrieving message ID for %s.%s failed: %v",
						pkgPath, msg.GetMessageName(), err)
				}
				continue
			}
			n++

			if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
				c.pingReqID = msgID
				c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
			} else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
				c.pingReplyID = msgID
				c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
			}

			if debugMsgIDs {
				log.Debugf("message %q (%s) has ID: %d", msg.GetMessageName(), getMsgNameWithCrc(msg), msgID)
			}
		}
		log.WithField("took", time.Since(t)).
			Debugf("retrieved IDs for %d messages (registered %d) from path %s", n, len(msgs), pkgPath)
	}

	return nil
}

func (c *Connection) sendConnEvent(event ConnectionEvent) {
	select {
	case c.connChan <- event:
	default:
		log.Warn("Connection state channel is full, discarding value.")
	}
}

// Trace gives access to the API trace interface
func (c *Connection) Trace() api.Trace {
	return c.apiTrace
}

// trace records api message
func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) {
	if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 {
		return
	}
	entry := &api.Record{
		Message:    msg,
		Timestamp:  t,
		IsReceived: isReceived,
		ChannelID:  chId,
	}
	c.apiTrace.mux.Lock()
	c.apiTrace.list = append(c.apiTrace.list, entry)
	c.apiTrace.mux.Unlock()
}