#!/usr/bin/env python3 """ BFD tests """ from __future__ import division import binascii from collections import namedtuple import hashlib import ipaddress import reprlib import time import unittest from random import randint, shuffle, getrandbits from socket import AF_INET, AF_INET6, inet_ntop from struct import pack, unpack import scapy.compat from scapy.layers.inet import UDP, IP from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether, GRE from scapy.packet import Raw from config import config from bfd import ( VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, BFDDiagCode, BFDState, BFD_vpp_echo, ) from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11 from framework import is_distro_ubuntu2204, is_distro_debian11 from framework import VppTestCase, VppTestRunner from framework import tag_run_solo from util import ppp from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_lo_interface import VppLoInterface from vpp_papi_provider import UnexpectedApiReturnValueError, CliFailedCommandError from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc from vpp_gre_interface import VppGreInterface from vpp_papi import VppEnum USEC_IN_SEC = 1000000 class AuthKeyFactory(object): """Factory class for creating auth keys with unique conf key ID""" def __init__(self): self._conf_key_ids = {} def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1): """create a random key with unique conf key id""" conf_key_id = randint(0, 0xFFFFFFFF) while conf_key_id in self._conf_key_ids: conf_key_id = randint(0, 0xFFFFFFFF) self._conf_key_ids[conf_key_id] = 1 key = scapy.compat.raw( bytearray([randint(0, 255) for _ in range(randint(1, 20))]) ) return VppBFDAuthKey( test=test, auth_type=auth_type, conf_key_id=conf_key_id, key=key ) class BFDAPITestCase(VppTestCase): """Bidirectional Forwarding Detection (BFD) - API""" pg0 = None pg1 = None @classmethod def setUpClass(cls): super(BFDAPITestCase, cls).setUpClass() cls.vapi.cli("set log class bfd level debug") try: cls.create_pg_interfaces(range(2)) for i in cls.pg_interfaces: i.config_ip4() i.config_ip6() i.resolve_arp() except Exception: super(BFDAPITestCase, cls).tearDownClass() raise @classmethod def tearDownClass(cls): super(BFDAPITestCase, cls).tearDownClass() def setUp(self): super(BFDAPITestCase, self).setUp() self.factory = AuthKeyFactory() def test_add_bfd(self): """create a BFD session""" session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4) session.add_vpp_config() self.logger.debug("Session state is %s", session.state) session.remove_vpp_config() session.add_vpp_config() self.logger.debug("Session state is %s", session.state) session.remove_vpp_config() def test_double_add(self): """create the same BFD session twice (negative case)""" session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4) session.add_vpp_config() with self.vapi.assert_negative_api_retval(): session.add_vpp_config() session.remove_vpp_config() def test_add_bfd6(self): """create IPv6 BFD session""" session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6) session.add_vpp_config() self.logger.debug("Session state is %s", session.state) session.remove_vpp_config() session.add_vpp_config() self.logger.debug("Session state is %s", session.state) session.remove_vpp_config() def test_mod_bfd(self): """modify BFD session parameters""" session = VppBFDUDPSession( self, self.pg0, self.pg0.remote_ip4, desired_min_tx=50000, required_min_rx=10000, detect_mult=1, ) session.add_vpp_config() s = session.get_bfd_udp_session_dump_entry() self.assert_equal( session.desired_min_tx, s.desired_min_tx, "desired min transmit interval" ) self.assert_equal( session.required_min_rx, s.required_min_rx, "required min receive interval" ) self.assert_equal(session.detect_mult, s.detect_mult, "detect mult") session.modify_parameters( desired_min_tx=session.desired_min_tx * 2, required_min_rx=session.required_min_rx * 2, detect_mult=session.detect_mult * 2, ) s = session.get_bfd_udp_session_dump_entry() self.assert_equal( session.desired_min_tx, s.desired_min_tx, "desired min transmit interval" ) self.assert_equal( session.required_min_rx, s.required_min_rx, "required min receive interval" ) self.assert_equal(session.detect_mult, s.detect_mult, "detect mult") def test_upd_bfd(self): """Create/Modify w/ Update BFD session parameters""" session = VppBFDUDPSession( self, self.pg0, self.pg0.remote_ip4, desired_min_tx=50000, required_min_rx=10000, detect_mult=1, ) session.upd_vpp_config() s = session.get_bfd_udp_session_dump_entry() self.assert_equal( session.desired_min_tx, s.desired_min_tx, "desired min transmit interval" ) self.assert_equal( session.required_min_rx, s.required_min_rx, "required min receive interval" ) self.assert_equal(session.detect_mult, s.detect_mult, "detect mult") session.upd_vpp_config( desired_min_tx=session.desired_min_tx * 2, required_min_rx=session.required_min_rx * 2, detect_mult=session.detect_mult * 2, ) s = session.get_bfd_udp_session_dump_entry() self.assert_equal( session.desired_min_tx, s.desired_min_tx, "desired min transmit interval" ) self.assert_equal( session.required_min_rx, s.required_min_rx, "required min receive interval" ) self.assert_equal(session.detect_mult, s.detect_mult, "detect mult") def test_add_sha1_keys(self): """add SHA1 keys""" key_count = 10 keys = [self.factory.create_random_key(self) for i in range(0, key_count)] for key in keys: self.assertFalse(key.query_vpp_config()) for key in keys: key.add_vpp_config() for key in keys: self.assertTrue(key.query_vpp_config()) # remove randomly indexes = list(range(key_count)) shuffle(indexes) removed = [] for i in indexes: key = keys[i] key.remove_vpp_config() removed.append(i) for j in range(key_count): key = keys[j] if j in removed: self.assertFalse(key.query_vpp_config()) else: self.assertTrue(key.query_vpp_config()) # should be removed now for key in keys: self.assertFalse(key.query_vpp_config()) # add back and remove again for key in keys: key.add_vpp_config() for key in keys: self.assertTrue(key.query_vpp_config()) for key in keys: key.remove_vpp_config() for key in keys: self.assertFalse(key.query_vpp_config()) def test_add_bfd_sha1(self): """create a BFD session (SHA1)""" key = self.factory.create_random_key(self) key.add_vpp_config() session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key) session.add_vpp_config() self.logger.debug("Session state is %s", session.state) session.remove_vpp_config() session.add_vpp_config() self.logger.debug("Session state is %s", session.state) session.remove_vpp_config() def test_double_add_sha1(self): """create the same BFD session twice (negative case) (SHA1)""" key = self.factory.create_random_key(self) key.add_vpp_config() session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key) session.add_vpp_config() with self.assertRaises(Exception): session.add_vpp_config() def test_add_auth_nonexistent_key(self): """create BFD session using non-existent SHA1 (negative case)""" session = VppBFDUDPSession( self, self.pg0, self.pg0.remote_ip4, sha1_key=self.factory.create_random_key(self), ) with self.assertRaises(Exception): session.add_vpp_config() def test_shared_sha1_key(self): """single SHA1 key shared by multiple BFD sessions""" key = self.factory.create_random_key(self) key.add_vpp_config() sessions = [ VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key), VppBFDUDPSession( self, self.pg0, self.pg0.remote_ip6, sha1_key=key, af=AF_INET6 ), VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4, sha1_key=key), VppBFDUDPSession( self, self.pg1, self.pg1.remote_ip6, sha1_key=key, af=AF_INET6 ), ] for s in sessions: s.add_vpp_config() removed = 0 for s in sessions: e = key.get_bfd_auth_keys_dump_entry() self.assert_equal( e.use_count, len(sessions) - removed, "Use count for shared key" ) s.remove_vpp_config() removed += 1 e = key.get_bfd_auth_keys_dump_entry() self.assert_equal( e.use_count, len(sessions) - removed, "Use count for shared key" ) def test_activate_auth(self): """activate SHA1 authentication""" key = self.factory.create_random_key(self) key.add_vpp_config() session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4) session.add_vpp_config() session.activate_auth(key) def test_deactivate_auth(self): """deactivate SHA1 authentication""" key = self.factory.create_random_key(self) key.add_vpp_config() session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4) session.add_vpp_config() session.activate_auth(key) session.deactivate_auth() def test_change_key(self): """change SHA1 key""" key1 = self.factory.create_random_key(self) key2 = self.factory.create_random_key(self) while key2.conf_key_id == key1.conf_key_id: key2 = self.factory.create_random_key(self) key1.add_vpp_config() key2.add_vpp_config() session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key1) session.add_vpp_config() session.activate_auth(key2) def test_set_del_udp_echo_source(self): """set/del udp echo source""" self.create_loopback_interfaces(1) self.loopback0 = self.lo_interfaces[0] self.loopback0.admin_up() echo_source = self.vapi.bfd_udp_get_echo_source() self.assertFalse(echo_source.is_set) self.assertFalse(echo_source.have_usable_ip4) self.assertFalse(echo_source.have_usable_ip6) self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index) echo_source = self.vapi.bfd_udp_get_echo_source() self.assertTrue(echo_source.is_set) self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index) self.assertFalse(echo_source.have_usable_ip4) self.assertFalse(echo_source.have_usable_ip6) self.loopback0.config_ip4() echo_ip4 = ipaddress.IPv4Address( int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1 ).packed echo_source = self.vapi.bfd_udp_get_echo_source() self.assertTrue(echo_source.is_set) self.assertEqual(echo_source.sw_if_index, sel
<!DOCTYPE HTML>
<html lang="en-US">
<head>
<meta charset="UTF-8">
<meta http-equiv="refresh" content="0; url=https://docs.fd.io/csit/master/trending/index.html">
<title>Page Redirection</title>
</head>
<body>
If you are not redirected automatically, follow this <a href='https://docs.fd.io/csit/master/trending/index.html'>link to the Trending</a>.
</body>
</html>