aboutsummaryrefslogtreecommitdiffstats
path: root/test/bfd.py
blob: ebe4e5f95f208969800c2ea73f06891184a80c7d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
""" BFD protocol implementation """

from random import randint
from socket import AF_INET, AF_INET6, inet_pton
from scapy.all import bind_layers
from scapy.layers.inet import UDP
from scapy.packet import Packet
from scapy.fields import (
    BitField,
    BitEnumField,
    XByteField,
    FlagsField,
    ConditionalField,
    StrField,
)
from vpp_object import VppObject
from util import NumericConstant
from vpp_papi import VppEnum

BFD_UDP_SH_PORT = 3784
BFD_UDP_MH_PORT = 4784


class BFDDiagCode(NumericConstant):
    """BFD Diagnostic Code"""

    no_diagnostic = 0
    control_detection_time_expired = 1
    echo_function_failed = 2
    neighbor_signaled_session_down = 3
    forwarding_plane_reset = 4
    path_down = 5
    concatenated_path_down = 6
    administratively_down = 7
    reverse_concatenated_path_down = 8

    desc_dict = {
        no_diagnostic: "No diagnostic",
        control_detection_time_expired: "Control Detection Time Expired",
        echo_function_failed: "Echo Function Failed",
        neighbor_signaled_session_down: "Neighbor Signaled Session Down",
        forwarding_plane_reset: "Forwarding Plane Reset",
        path_down: "Path Down",
        concatenated_path_down: "Concatenated Path Down",
        administratively_down: "Administratively Down",
        reverse_concatenated_path_down: "Reverse Concatenated Path Down",
    }


class BFDState(NumericConstant):
    """BFD State"""

    admin_down = 0
    down = 1
    init = 2
    up = 3

    desc_dict = {
        admin_down: "AdminDown",
        down: "Down",
        init: "Init",
        up: "Up",
    }


class BFDAuthType(NumericConstant):
    """BFD Authentication Type"""

    no_auth = 0
    simple_pwd = 1
    keyed_md5 = 2
    meticulous_keyed_md5 = 3
    keyed_sha1 = 4
    meticulous_keyed_sha1 = 5

    desc_dict = {
        no_auth: "No authentication",
        simple_pwd: "Simple Password",
        keyed_md5: "Keyed MD5",
        meticulous_keyed_md5: "Meticulous Keyed MD5",
        keyed_sha1: "Keyed SHA1",
        meticulous_keyed_sha1: "Meticulous Keyed SHA1",
    }


def bfd_is_auth_used(pkt):
    """is packet authenticated?"""
    return "A" in pkt.sprintf("%BFD.flags%")


def bfd_is_simple_pwd_used(pkt):
    """is simple password authentication used?"""
    return bfd_is_auth_used(pkt) and pkt.auth_type == BFDAuthType.simple_pwd


def bfd_is_sha1_used(pkt):
    """is sha1 authentication used?"""
    return bfd_is_auth_used(pkt) and pkt.auth_type in (
        BFDAuthType.keyed_sha1,
        BFDAuthType.meticulous_keyed_sha1,
    )


def bfd_is_md5_used(pkt):
    """is md5 authentication used?"""
    return bfd_is_auth_used(pkt) and pkt.auth_type in (
        BFDAuthType.keyed_md5,
        BFDAuthType.meticulous_keyed_md5,
    )


def bfd_is_md5_or_sha1_used(pkt):
    """is md5 or sha1 used?"""
    return bfd_is_md5_used(pkt) or bfd_is_sha1_used(pkt)


class BFD(Packet):
    """BFD protocol layer for scapy"""

    udp_dport = 3784  #: BFD destination port per RFC 5881
    udp_dport_echo = 3785  # : BFD destination port for ECHO per RFC 5881
    udp_sport_min = 49152  #: BFD source port min value per RFC 5881
    udp_sport_max = 65535  #: BFD source port max value per RFC 5881
    bfd_pkt_len = 24  # : length of BFD pkt without authentication section
    sha1_auth_len = 28  # : length of authentication section if SHA1 used

    name = "BFD"

    fields_desc = [
        BitField("version", 1, 3),
        BitEnumField("diag", 0, 5, BFDDiagCode.desc_dict),
        BitEnumField("state", 0, 2, BFDState.desc_dict),
        FlagsField("flags", 0, 6, ["M", "D", "A", "C", "F", "P"]),
        XByteField("detect_mult", 0),
        BitField("length", bfd_pkt_len, 8),
        BitField("my_discriminator", 0, 32),
        BitField("your_discriminator", 0, 32),
        BitField("desired_min_tx_interval", 0, 32),
        BitField("required_min_rx_interval", 0, 32),
        BitField("required_min_echo_rx_interval", 0, 32),
        ConditionalField(
            BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict), bfd_is_auth_used
        ),
        ConditionalField(BitField("auth_len", 0, 8), bfd_is_auth_used),
        ConditionalField(BitField("auth_key_id", 0, 8), bfd_is_auth_used),
        ConditionalField(BitField("auth_reserved", 0, 8), bfd_is_md5_or_sha1_used),
        ConditionalField(BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used),
        ConditionalField(StrField("auth_key_hash", "0" * 16), bfd_is_md5_used),
        ConditionalField(StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used),
    ]

    def mysummary(self):
        return self.sprintf(
            "BFD(my_disc=%BFD.my_discriminator%, your_disc=%BFD.your_discriminator%)"
        )


# glue the BFD packet class to scapy parser
bind_layers(UDP, BFD, dport=BFD_UDP_SH_PORT)
bind_layers(UDP, BFD, dport=BFD_UDP_MH_PORT)


class BFD_vpp_echo(Packet):
    """BFD echo packet as used by VPP (non-rfc, as rfc doesn't define one)"""

    udp_dport = 3785  #: BFD echo destination port per RFC 5881
    name = "BFD_VPP_ECHO"

    fields_desc = [
        BitField("discriminator", 0, 32),
        BitField("expire_time_clocks", 0, 64),
        BitField("checksum", 0, 64),
    ]

    def mysummary(self):
        return self.sprintf(
            "BFD_VPP_ECHO(disc=%BFD_VPP_ECHO.discriminator%,"
            "expire_time_clocks=%BFD_VPP_ECHO.expire_time_clocks%)"
        )


# glue the BFD echo packet class to scapy parser
bind_layers(UDP, BFD_vpp_echo, dport=BFD_vpp_echo.udp_dport)


class VppBFDAuthKey(VppObject):
    """Represents BFD authentication key in VPP"""

    def __init__(self, test, conf_key_id, auth_type, key):
        self._test = test
        self._key = key
        self._auth_type = auth_type
        test.assertIn(auth_type, BFDAuthType.desc_dict)
        self._conf_key_id = conf_key_id

    @property
    def test(self):
        """Test which created this key"""
        return self._test

    @property
    def auth_type(self):
        """Authentication type for this key"""
        return self._auth_type

    @property
    def key(self):
        """key data"""
        return self._key

    @key.setter
    def key(self, value):
        self._key = value

    @property
    def conf_key_id(self):
        """configuration key ID"""
        return self._conf_key_id

    def add_vpp_config(self):
        self.test.vapi.bfd_auth_set_key(
            conf_key_id=self._conf_key_id,
            auth_type=self._auth_type,
            key=self._key,
            key_len=len(self._key),
        )
        self._test.registry.register(self, self.test.logger)

    def get_bfd_auth_keys_dump_entry(self):
        """get the entry in the auth keys dump corresponding to this key"""
        result = self.test.vapi.bfd_auth_keys_dump()
        for k in result:
            if k.conf_key_id == self._conf_key_id:
                return k
        return None

    def query_vpp_config(self):
        return self.get_bfd_auth_keys_dump_entry() is not None

    def remove_vpp_config(self):
        self.test.vapi.bfd_auth_del_key(conf_key_id=self._conf_key_id)

    def object_id(self):
        return "bfd-auth-key-%s" % self._conf_key_id


class VppBFDUDPSession(VppObject):
    """Represents BFD UDP session in VPP"""

    def __init__(
        self,
        test,
        interface,
        peer_addr,
        multihop=False,
        local_addr=None,
        af=AF_INET,
        desired_min_tx=300000,
        required_min_rx=300000,
        detect_mult=3,
        sha1_key=None,
        bfd_key_id=None,
        is_tunnel=False,
    ):
        self._test = test
        self._multihop = multihop
        self._interface = interface
        self._af = af

        if multihop:
            self._sw_if_index = 0xFFFFFFFF
            self.test.vapi.bfd_udp_enable_multihop()
        else:
            self._sw_if_index = self._interface.sw_if_index

        if local_addr:
            self._local_addr = local_addr
        else:
            if self.af == AF_INET:
                self._local_addr = self.interface.local_ip4
            else:
                self._local_addr = self.interface.local_ip6

        self._peer_addr = peer_addr
        self._desired_min_tx = desired_min_tx
        self._required_min_rx = required_min_rx
        self._detect_mult = detect_mult
        self._sha1_key = sha1_key
        if bfd_key_id is not None:
            self._bfd_key_id = bfd_key_id
        else:
            self._bfd_key_id = randint(0, 255)
        self._is_tunnel = is_tunnel

    @property
    def test(self):
        """Test which created this session"""
        return self._test

    @property
    def interface(self):
        """Interface on which this session lives"""
        return self._interface

    @property
    def af(self):
        """Address family - AF_INET or AF_INET6"""
        return self._af

    @property
    def local_addr(self):
        """BFD session local address (VPP address)"""
        if self._local_addr is None:
            if self.af == AF_INET:
                return self._interface.local_ip4
            elif self.af == AF_INET6:
                return self._interface.local_ip6
            else:
                raise Exception("Unexpected af '%s'" % self.af)
        return self._local_addr

    @property
    def peer_addr(self):
        """BFD session peer address"""
        return self._peer_addr

    def get_bfd_udp_session_dump_entry(self):
        """get the namedtuple entry from bfd udp session dump"""
        result = self.test.vapi.bfd_udp_session_dump()
        for s in result:
            self.test.logger.debug("session entry: %s" % str(s))
            multihop = s.sw_if_index == ~0
            if multihop or s.sw_if_index == self._sw_if_index:
                if self._local_addr == str(s.local_addr) and self._peer_addr == str(
                    s.peer_addr
                ):
                    return s
        return None

    @property
    def state(self):
        """BFD session state"""
        session = self.get_bfd_udp_session_dump_entry()
        if session is None:
            raise Exception("Could not find BFD session in VPP response")
        return session.state

    @property
    def desired_min_tx(self):
        """desired minimum tx interval"""
        return self._desired_min_tx

    @property
    def required_min_rx(self):
        """required minimum rx interval"""
        return self._required_min_rx

    @property
    def detect_mult(self):
        """detect multiplier"""
        return self._detect_mult

    @property
    def sha1_key(self):
        """sha1 key"""
        return self._sha1_key

    @property
    def bfd_key_id(self):
        """bfd key id in use"""
        return self._bfd_key_id

    @property
    def is_tunnel(self):
        return self._is_tunnel

    def activate_auth(self, key, bfd_key_id=None, delayed=False):
        """activate authentication for this session"""
        self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255)
        self._sha1_key = key
        conf_key_id = self._sha1_key.conf_key_id
        is_delayed = 1 if delayed else 0
        self.test.vapi.bfd_udp_auth_activate(
            sw_if_index=self._sw_if_index,
            local_addr=self.local_addr,
            peer_addr=self.peer_addr,
            bfd_key_id=self._bfd_key_id,
            conf_key_id=conf_key_id,
            is_delayed=is_delayed,
        )

    def deactivate_auth(self, delayed=False):
        """deactivate authentication"""
        self._bfd_key_id = None
        self._sha1_key = None
        is_delayed = 1 if delayed else 0
        self.test.vapi.bfd_udp_auth_deactivate(
            sw_if_index=self._sw_if_index,
            local_addr=self.local_addr,
            peer_addr=self.peer_addr,
            is_delayed=is_delayed,
        )

    def modify_parameters(
        self, detect_mult=None, desired_min_tx=None, required_min_rx=None
    ):
        """modify session parameters"""
        if detect_mult:
            self._detect_mult = detect_mult
        if desired_min_tx:
            self._desired_min_tx = desired_min_tx
        if required_min_rx:
            self._required_min_rx = required_min_rx
        self.test.vapi.bfd_udp_mod(
            sw_if_index=self._sw_if_index,
            desired_min_tx=self.desired_min_tx,
            required_min_rx=self.required_min_rx,
            detect_mult=self.detect_mult,
            local_addr=self.local_addr,
            peer_addr=self.peer_addr,
        )

    def add_vpp_config(self):
        bfd_key_id = self._bfd_key_id if self._sha1_key else None
        conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None
        is_authenticated = True if self._sha1_key else False
        self.test.vapi.bfd_udp_add(
            sw_if_index=self._sw_if_index,
            desired_min_tx=self.desired_min_tx,
            required_min_rx=self.required_min_rx,
            detect_mult=self.detect_mult,
            local_addr=self.local_addr,
            peer_addr=self.peer_addr,
            bfd_key_id=bfd_key_id,
            conf_key_id=conf_key_id,
            is_authenticated=is_authenticated,
        )
        self._test.registry.register(self, self.test.logger)

    def upd_vpp_config(
        self, detect_mult=None, desired_min_tx=None, required_min_rx=None
    ):
        if desired_min_tx:
            self._desired_min_tx = desired_min_tx
        if required_min_rx:
            self._required_min_rx = required_min_rx
        if detect_mult:
            self._detect_mult = detect_mult
        bfd_key_id = self._bfd_key_id if self._sha1_key else None
        conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None
        is_authenticated = True if self._sha1_key else False
        self.test.vapi.bfd_udp_upd(
            sw_if_index=self._sw_if_index,
            desired_min_tx=self.desired_min_tx,
            required_min_rx=self.required_min_rx,
            detect_mult=self.detect_mult,
            local_addr=self.local_addr,
            peer_addr=self.peer_addr,
            bfd_key_id=bfd_key_id,
            conf_key_id=conf_key_id,
            is_authenticated=is_authenticated,
        )
        self._test.registry.register(self, self.test.logger)

    def query_vpp_config(self):
        session = self.get_bfd_udp_session_dump_entry()
        return session is not None

    def remove_vpp_config(self):
        self.test.vapi.bfd_udp_del(
            sw_if_index=self._sw_if_index,
            local_addr=self.local_addr,
            peer_addr=self.peer_addr,
        )

    def object_id(self):
        return "bfd-udp-%s-%s-%s-%s-%s" % (
            self._multihop,
            self._sw_if_index,
            self.local_addr,
            self.peer_addr,
            self.af,
        )

    def admin_up(self):
        """set bfd session admin-up"""
        self.test.vapi.bfd_udp_session_set_flags(
            flags=VppEnum.vl_api_if_status_flags_t.IF_STATUS_API_FLAG_ADMIN_UP,
            sw_if_index=self._sw_if_index,
            local_addr=self.local_addr,
            peer_addr=self.peer_addr,
        )

    def admin_down(self):
        """set bfd session admin-down"""
        self.test.vapi.bfd_udp_session_set_flags(
            flags=0,
            sw_if_index=self._sw_if_index,
            local_addr=self.local_addr,
            peer_addr=self.peer_addr,
        )