aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorKlement Sekera <ksekera@cisco.com>2017-01-09 07:43:48 +0100
committerDamjan Marion <dmarion.lists@gmail.com>2017-02-02 12:04:30 +0000
commitb17dd9607ee8ecba5ae3ef69c7b4915b57de292a (patch)
treeaa8d22ac4a9ea3adc2f96c4d754273af505950d5 /test
parent402ed3128512efc091a560729ce1e772a86e9f74 (diff)
BFD: SHA1 authentication
Add authentication support to BFD feature. Out of three existing authentication types, implement SHA1 (sole RFC requirement). Simple password is insecure and MD5 is discouraged by the RFC, so ignore those. Add/change APIs to allow configuring BFD authentication keys and their usage with BFD sessions. Change-Id: Ifb0fb5b19c2e72196d84c1cde919bd4c074ea415 Signed-off-by: Klement Sekera <ksekera@cisco.com>
Diffstat (limited to 'test')
-rw-r--r--test/bfd.py265
-rw-r--r--test/framework.py15
-rw-r--r--test/test_bfd.py733
-rw-r--r--test/vpp_papi_provider.py77
4 files changed, 978 insertions, 112 deletions
diff --git a/test/bfd.py b/test/bfd.py
index 51716813..475a1707 100644
--- a/test/bfd.py
+++ b/test/bfd.py
@@ -1,3 +1,4 @@
+from random import randint
from socket import AF_INET, AF_INET6
from scapy.all import *
from scapy.packet import *
@@ -53,11 +54,57 @@ class BFDState(NumericConstant):
NumericConstant.__init__(self, value)
+class BFDAuthType(NumericConstant):
+ """ BFD Authentication Type """
+ no_auth = 0
+ simple_pwd = 1
+ keyed_md5 = 2
+ meticulous_keyed_md5 = 3
+ keyed_sha1 = 4
+ meticulous_keyed_sha1 = 5
+
+ desc_dict = {
+ no_auth: "No authentication",
+ simple_pwd: "Simple Password",
+ keyed_md5: "Keyed MD5",
+ meticulous_keyed_md5: "Meticulous Keyed MD5",
+ keyed_sha1: "Keyed SHA1",
+ meticulous_keyed_sha1: "Meticulous Keyed SHA1",
+ }
+
+ def __init__(self, value):
+ NumericConstant.__init__(self, value)
+
+
+def bfd_is_auth_used(pkt):
+ return "A" in pkt.sprintf("%BFD.flags%")
+
+
+def bfd_is_simple_pwd_used(pkt):
+ return bfd_is_auth_used(pkt) and pkt.auth_type == BFDAuthType.simple_pwd
+
+
+def bfd_is_sha1_used(pkt):
+ return bfd_is_auth_used(pkt) and pkt.auth_type in \
+ (BFDAuthType.keyed_sha1, BFDAuthType.meticulous_keyed_sha1)
+
+
+def bfd_is_md5_used(pkt):
+ return bfd_is_auth_used(pkt) and pkt.auth_type in \
+ (BFDAuthType.keyed_md5, BFDAuthType.meticulous_keyed_md5)
+
+
+def bfd_is_md5_or_sha1_used(pkt):
+ return bfd_is_md5_used(pkt) or bfd_is_sha1_used(pkt)
+
+
class BFD(Packet):
udp_dport = 3784 #: BFD destination port per RFC 5881
udp_sport_min = 49152 #: BFD source port min value per RFC 5881
udp_sport_max = 65535 #: BFD source port max value per RFC 5881
+ bfd_pkt_len = 24 # : length of BFD pkt without authentication section
+ sha1_auth_len = 28 # : length of authentication section if SHA1 used
name = "BFD"
@@ -65,14 +112,27 @@ class BFD(Packet):
BitField("version", 1, 3),
BitEnumField("diag", 0, 5, BFDDiagCode.desc_dict),
BitEnumField("state", 0, 2, BFDState.desc_dict),
- FlagsField("flags", 0, 6, ['P', 'F', 'C', 'A', 'D', 'M']),
+ FlagsField("flags", 0, 6, ['M', 'D', 'A', 'C', 'F', 'P']),
XByteField("detect_mult", 0),
- XByteField("length", 24),
+ BitField("length", bfd_pkt_len, 8),
BitField("my_discriminator", 0, 32),
BitField("your_discriminator", 0, 32),
BitField("desired_min_tx_interval", 0, 32),
BitField("required_min_rx_interval", 0, 32),
- BitField("required_min_echo_rx_interval", 0, 32)]
+ BitField("required_min_echo_rx_interval", 0, 32),
+ ConditionalField(
+ BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict),
+ bfd_is_auth_used),
+ ConditionalField(BitField("auth_len", 0, 8), bfd_is_auth_used),
+ ConditionalField(BitField("auth_key_id", 0, 8), bfd_is_auth_used),
+ ConditionalField(BitField("auth_reserved", 0, 8),
+ bfd_is_md5_or_sha1_used),
+ ConditionalField(
+ BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used),
+ ConditionalField(StrField("auth_key_hash", "0" * 16), bfd_is_md5_used),
+ ConditionalField(
+ StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used),
+ ]
def mysummary(self):
return self.sprintf("BFD(my_disc=%BFD.my_discriminator%,"
@@ -82,9 +142,78 @@ class BFD(Packet):
bind_layers(UDP, BFD, dport=BFD.udp_dport)
+class VppBFDAuthKey(VppObject):
+ """ Represents BFD authentication key in VPP """
+
+ def __init__(self, test, conf_key_id, auth_type, key):
+ self._test = test
+ self._key = key
+ self._auth_type = auth_type
+ test.assertIn(auth_type, BFDAuthType.desc_dict)
+ self._conf_key_id = conf_key_id
+
+ @property
+ def test(self):
+ """ Test which created this key """
+ return self._test
+
+ @property
+ def auth_type(self):
+ """ Authentication type for this key """
+ return self._auth_type
+
+ @property
+ def key(self):
+ return self._key
+
+ @property
+ def conf_key_id(self):
+ return self._conf_key_id
+
+ def add_vpp_config(self):
+ self.test.vapi.bfd_auth_set_key(
+ self._conf_key_id, self._auth_type, self._key)
+ self._test.registry.register(self, self.test.logger)
+
+ def get_bfd_auth_keys_dump_entry(self):
+ """ get the entry in the auth keys dump corresponding to this key """
+ result = self.test.vapi.bfd_auth_keys_dump()
+ for k in result:
+ if k.conf_key_id == self._conf_key_id:
+ return k
+ return None
+
+ def query_vpp_config(self):
+ return self.get_bfd_auth_keys_dump_entry() is not None
+
+ def remove_vpp_config(self):
+ self.test.vapi.bfd_auth_del_key(self._conf_key_id)
+
+ def object_id(self):
+ return "bfd-auth-key-%s" % self._conf_key_id
+
+ def __str__(self):
+ return self.object_id()
+
+
class VppBFDUDPSession(VppObject):
""" Represents BFD UDP session in VPP """
+ def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET,
+ desired_min_tx=100000, required_min_rx=100000, detect_mult=3,
+ sha1_key=None, bfd_key_id=None):
+ self._test = test
+ self._interface = interface
+ self._af = af
+ self._local_addr = local_addr
+ self._peer_addr = peer_addr
+ self._peer_addr_n = socket.inet_pton(af, peer_addr)
+ self._desired_min_tx = desired_min_tx
+ self._required_min_rx = required_min_rx
+ self._detect_mult = detect_mult
+ self._sha1_key = sha1_key
+ self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255)
+
@property
def test(self):
""" Test which created this session """
@@ -101,13 +230,6 @@ class VppBFDUDPSession(VppObject):
return self._af
@property
- def bs_index(self):
- """ BFD session index from VPP """
- if self._bs_index is not None:
- return self._bs_index
- raise NotConfiguredException("not configured")
-
- @property
def local_addr(self):
""" BFD session local address (VPP address) """
if self._local_addr is None:
@@ -141,19 +263,27 @@ class VppBFDUDPSession(VppObject):
""" BFD session peer address - raw, suitable for API """
return self._peer_addr_n
- @property
- def state(self):
- """ BFD session state """
+ def get_bfd_udp_session_dump_entry(self):
result = self.test.vapi.bfd_udp_session_dump()
- session = None
for s in result:
+ self.test.logger.debug("session entry: %s" % str(s))
if s.sw_if_index == self.interface.sw_if_index:
if self.af == AF_INET \
and s.is_ipv6 == 0 \
and self.interface.local_ip4n == s.local_addr[:4] \
and self.interface.remote_ip4n == s.peer_addr[:4]:
- session = s
- break
+ return s
+ if self.af == AF_INET6 \
+ and s.is_ipv6 == 1 \
+ and self.interface.local_ip6n == s.local_addr \
+ and self.interface.remote_ip6n == s.peer_addr:
+ return s
+ return None
+
+ @property
+ def state(self):
+ """ BFD session state """
+ session = self.get_bfd_udp_session_dump_entry()
if session is None:
raise Exception("Could not find BFD session in VPP response: %s" %
repr(result))
@@ -171,61 +301,78 @@ class VppBFDUDPSession(VppObject):
def detect_mult(self):
return self._detect_mult
- def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET,
- desired_min_tx=100000, required_min_rx=100000, detect_mult=3):
- self._test = test
- self._interface = interface
- self._af = af
- self._local_addr = local_addr
- self._peer_addr = peer_addr
- self._peer_addr_n = socket.inet_pton(af, peer_addr)
- self._bs_index = None
- self._desired_min_tx = desired_min_tx
- self._required_min_rx = required_min_rx
- self._detect_mult = detect_mult
+ @property
+ def sha1_key(self):
+ return self._sha1_key
+
+ @property
+ def bfd_key_id(self):
+ return self._bfd_key_id
+
+ def activate_auth(self, key, bfd_key_id=None, delayed=False):
+ self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255)
+ self._sha1_key = key
+ is_ipv6 = 1 if AF_INET6 == self.af else 0
+ conf_key_id = self._sha1_key.conf_key_id
+ is_delayed = 1 if delayed else 0
+ self.test.vapi.bfd_udp_auth_activate(self._interface.sw_if_index,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6,
+ bfd_key_id=self._bfd_key_id,
+ conf_key_id=conf_key_id,
+ is_delayed=is_delayed)
+
+ def deactivate_auth(self, delayed=False):
+ self._bfd_key_id = None
+ self._sha1_key = None
+ is_delayed = 1 if delayed else 0
+ is_ipv6 = 1 if AF_INET6 == self.af else 0
+ self.test.vapi.bfd_udp_auth_deactivate(self._interface.sw_if_index,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6,
+ is_delayed=is_delayed)
def add_vpp_config(self):
is_ipv6 = 1 if AF_INET6 == self.af else 0
- result = self.test.vapi.bfd_udp_add(
- self._interface.sw_if_index,
- self.desired_min_tx,
- self.required_min_rx,
- self.detect_mult,
- self.local_addr_n,
- self.peer_addr_n,
- is_ipv6=is_ipv6)
- self._bs_index = result.bs_index
+ bfd_key_id = self._bfd_key_id if self._sha1_key else None
+ conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None
+ self.test.vapi.bfd_udp_add(self._interface.sw_if_index,
+ self.desired_min_tx,
+ self.required_min_rx,
+ self.detect_mult,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6,
+ bfd_key_id=bfd_key_id,
+ conf_key_id=conf_key_id)
self._test.registry.register(self, self.test.logger)
def query_vpp_config(self):
- result = self.test.vapi.bfd_udp_session_dump()
- session = None
- for s in result:
- if s.sw_if_index == self.interface.sw_if_index:
- if self.af == AF_INET \
- and s.is_ipv6 == 0 \
- and self.interface.local_ip4n == s.local_addr[:4] \
- and self.interface.remote_ip4n == s.peer_addr[:4]:
- session = s
- break
- if session is None:
- return False
- return True
+ session = self.get_bfd_udp_session_dump_entry()
+ return session is not None
def remove_vpp_config(self):
- if self._bs_index is not None:
- is_ipv6 = 1 if AF_INET6 == self._af else 0
- self.test.vapi.bfd_udp_del(
- self._interface.sw_if_index,
- self.local_addr_n,
- self.peer_addr_n,
- is_ipv6=is_ipv6)
+ is_ipv6 = 1 if AF_INET6 == self._af else 0
+ self.test.vapi.bfd_udp_del(self._interface.sw_if_index,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6)
def object_id(self):
- return "bfd-udp-%d" % self.bs_index
+ return "bfd-udp-%s-%s-%s-%s" % (self._interface.sw_if_index,
+ self.local_addr,
+ self.peer_addr,
+ self.af)
def __str__(self):
return self.object_id()
def admin_up(self):
- self.test.vapi.bfd_session_set_flags(self.bs_index, 1)
+ is_ipv6 = 1 if AF_INET6 == self._af else 0
+ self.test.vapi.bfd_udp_session_set_flags(1,
+ self._interface.sw_if_index,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6)
diff --git a/test/framework.py b/test/framework.py
index 02935604..8ceb33c3 100644
--- a/test/framework.py
+++ b/test/framework.py
@@ -89,8 +89,8 @@ class VppTestCase(unittest.TestCase):
if dl == "core":
if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
# give a heads up if this is actually useless
- cls.logger.critical("WARNING: core size limit is set 0, core "
- "files will NOT be created")
+ print(colorize("WARNING: core size limit is set 0, core files "
+ "will NOT be created", RED))
cls.debug_core = True
elif dl == "gdb":
cls.debug_gdb = True
@@ -533,12 +533,11 @@ class VppTestCase(unittest.TestCase):
self.assertEqual(real_value, expected_value, msg)
- def assert_in_range(
- self,
- real_value,
- expected_min,
- expected_max,
- name=None):
+ def assert_in_range(self,
+ real_value,
+ expected_min,
+ expected_max,
+ name=None):
if name is None:
msg = None
else:
diff --git a/test/test_bfd.py b/test/test_bfd.py
index d047b5a3..5f861477 100644
--- a/test/test_bfd.py
+++ b/test/test_bfd.py
@@ -1,6 +1,8 @@
#!/usr/bin/env python
import unittest
+import hashlib
+import binascii
import time
from random import randint
from bfd import *
@@ -10,6 +12,22 @@ from util import ppp
us_in_sec = 1000000
+class AuthKeyFactory(object):
+ """Factory class for creating auth keys with unique conf key ID"""
+
+ def __init__(self):
+ self._conf_key_ids = {}
+
+ def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
+ conf_key_id = randint(0, 0xFFFFFFFF)
+ while conf_key_id in self._conf_key_ids:
+ conf_key_id = randint(0, 0xFFFFFFFF)
+ self._conf_key_ids[conf_key_id] = 1
+ key = str(bytearray([randint(0, 255) for j in range(randint(1, 20))]))
+ return VppBFDAuthKey(test=test, auth_type=auth_type,
+ conf_key_id=conf_key_id, key=key)
+
+
class BFDAPITestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) - API"""
@@ -21,19 +39,23 @@ class BFDAPITestCase(VppTestCase):
cls.create_pg_interfaces(range(2))
for i in cls.pg_interfaces:
i.config_ip4()
+ i.config_ip6()
i.resolve_arp()
except Exception:
super(BFDAPITestCase, cls).tearDownClass()
raise
+ def setUp(self):
+ super(BFDAPITestCase, self).setUp()
+ self.factory = AuthKeyFactory()
+
def test_add_bfd(self):
""" create a BFD session """
session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
session.add_vpp_config()
self.logger.debug("Session state is %s" % str(session.state))
session.remove_vpp_config()
- session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
session.add_vpp_config()
self.logger.debug("Session state is %s" % str(session.state))
session.remove_vpp_config()
@@ -48,25 +70,155 @@ class BFDAPITestCase(VppTestCase):
session.remove_vpp_config()
- def test_add_two(self):
- """ create two BFD sessions """
- session1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
- session1.add_vpp_config()
- session2 = VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4)
- session2.add_vpp_config()
- self.assertNotEqual(session1.bs_index, session2.bs_index,
- "Different BFD sessions share bs_index (%s)" %
- session1.bs_index)
+ def test_add_bfd6(self):
+ """ create IPv6 BFD session """
+ session = VppBFDUDPSession(
+ self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
+ session.add_vpp_config()
+ self.logger.debug("Session state is %s" % str(session.state))
+ session.remove_vpp_config()
+ session.add_vpp_config()
+ self.logger.debug("Session state is %s" % str(session.state))
+ session.remove_vpp_config()
+
+ def test_add_sha1_keys(self):
+ """ add SHA1 keys """
+ key_count = 10
+ keys = [self.factory.create_random_key(
+ self) for i in range(0, key_count)]
+ for key in keys:
+ self.assertFalse(key.query_vpp_config())
+ for key in keys:
+ key.add_vpp_config()
+ for key in keys:
+ self.assertTrue(key.query_vpp_config())
+ # remove randomly
+ indexes = range(key_count)
+ random.shuffle(indexes)
+ removed = []
+ for i in indexes:
+ key = keys[i]
+ key.remove_vpp_config()
+ removed.append(i)
+ for j in range(key_count):
+ key = keys[j]
+ if j in removed:
+ self.assertFalse(key.query_vpp_config())
+ else:
+ self.assertTrue(key.query_vpp_config())
+ # should be removed now
+ for key in keys:
+ self.assertFalse(key.query_vpp_config())
+ # add back and remove again
+ for key in keys:
+ key.add_vpp_config()
+ for key in keys:
+ self.assertTrue(key.query_vpp_config())
+ for key in keys:
+ key.remove_vpp_config()
+ for key in keys:
+ self.assertFalse(key.query_vpp_config())
+
+ def test_add_bfd_sha1(self):
+ """ create a BFD session (SHA1) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
+ sha1_key=key)
+ session.add_vpp_config()
+ self.logger.debug("Session state is %s" % str(session.state))
+ session.remove_vpp_config()
+ session.add_vpp_config()
+ self.logger.debug("Session state is %s" % str(session.state))
+ session.remove_vpp_config()
+
+ def test_double_add_sha1(self):
+ """ create the same BFD session twice (negative case) (SHA1) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
+ sha1_key=key)
+ session.add_vpp_config()
+ with self.assertRaises(Exception):
+ session.add_vpp_config()
+
+ def test_add_authenticated_with_nonexistent_key(self):
+ """ create BFD session using non-existent SHA1 (negative case) """
+ session = VppBFDUDPSession(
+ self, self.pg0, self.pg0.remote_ip4,
+ sha1_key=self.factory.create_random_key(self))
+ with self.assertRaises(Exception):
+ session.add_vpp_config()
+
+ def test_shared_sha1_key(self):
+ """ share single SHA1 key between multiple BFD sessions """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ sessions = [
+ VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
+ sha1_key=key),
+ VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
+ sha1_key=key, af=AF_INET6),
+ VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
+ sha1_key=key),
+ VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
+ sha1_key=key, af=AF_INET6)]
+ for s in sessions:
+ s.add_vpp_config()
+ removed = 0
+ for s in sessions:
+ e = key.get_bfd_auth_keys_dump_entry()
+ self.assert_equal(e.use_count, len(sessions) - removed,
+ "Use count for shared key")
+ s.remove_vpp_config()
+ removed += 1
+ e = key.get_bfd_auth_keys_dump_entry()
+ self.assert_equal(e.use_count, len(sessions) - removed,
+ "Use count for shared key")
+
+ def test_activate_auth(self):
+ """ activate SHA1 authentication """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
+ session.add_vpp_config()
+ session.activate_auth(key)
+
+ def test_deactivate_auth(self):
+ """ deactivate SHA1 authentication """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
+ session.add_vpp_config()
+ session.activate_auth(key)
+ session.deactivate_auth()
+
+ def test_change_key(self):
+ key1 = self.factory.create_random_key(self)
+ key2 = self.factory.create_random_key(self)
+ while key2.conf_key_id == key1.conf_key_id:
+ key2 = self.factory.create_random_key(self)
+ key1.add_vpp_config()
+ key2.add_vpp_config()
+ session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
+ sha1_key=key1)
+ session.add_vpp_config()
+ session.activate_auth(key2)
class BFDTestSession(object):
""" BFD session as seen from test framework side """
- def __init__(self, test, interface, af, detect_mult=3):
+ def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
+ bfd_key_id=None, our_seq_number=0xFFFFFFFF - 4):
self.test = test
self.af = af
+ self.sha1_key = sha1_key
+ self.bfd_key_id = bfd_key_id
self.interface = interface
self.udp_sport = 50000
+ self.our_seq_number = our_seq_number
+ self.vpp_seq_number = None
self.bfd_values = {
'my_discriminator': 0,
'desired_min_tx_interval': 100000,
@@ -74,10 +226,25 @@ class BFDTestSession(object):
'diag': BFDDiagCode.no_diagnostic,
}
+ def inc_seq_num(self):
+ if self.our_seq_number == 0xFFFFFFFF:
+ self.our_seq_number = 0
+ else:
+ self.our_seq_number += 1
+
def update(self, **kwargs):
self.bfd_values.update(kwargs)
def create_packet(self):
+ if self.sha1_key:
+ bfd = BFD(flags="A")
+ bfd.auth_type = self.sha1_key.auth_type
+ bfd.auth_len = BFD.sha1_auth_len
+ bfd.auth_key_id = self.bfd_key_id
+ bfd.auth_seq_num = self.our_seq_number
+ bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
+ else:
+ bfd = BFD()
if self.af == AF_INET6:
packet = (Ether(src=self.interface.remote_mac,
dst=self.interface.local_mac) /
@@ -85,7 +252,7 @@ class BFDTestSession(object):
dst=self.interface.local_ip6,
hlim=255) /
UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
- BFD())
+ bfd)
else:
packet = (Ether(src=self.interface.remote_mac,
dst=self.interface.local_mac) /
@@ -93,11 +260,17 @@ class BFDTestSession(object):
dst=self.interface.local_ip4,
ttl=255) /
UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
- BFD())
+ bfd)
self.test.logger.debug("BFD: Creating packet")
for name, value in self.bfd_values.iteritems():
self.test.logger.debug("BFD: setting packet.%s=%s", name, value)
packet[BFD].setfieldval(name, value)
+ if self.sha1_key:
+ hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
+ "\0" * (20 - len(self.sha1_key.key))
+ self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
+ hashlib.sha1(hash_material).hexdigest())
+ packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
return packet
def send_packet(self):
@@ -106,13 +279,60 @@ class BFDTestSession(object):
self.test.pg0.add_stream([p])
self.test.pg_start()
- def verify_packet(self, packet):
+ def verify_sha1_auth(self, packet):
+ """ Verify correctness of authentication in BFD layer. """
+ bfd = packet[BFD]
+ self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
+ self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
+ BFDAuthType)
+ self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
+ self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
+ if self.vpp_seq_number is None:
+ self.vpp_seq_number = bfd.auth_seq_num
+ self.test.logger.debug("Received initial sequence number: %s" %
+ self.vpp_seq_number)
+ else:
+ recvd_seq_num = bfd.auth_seq_num
+ self.test.logger.debug("Received followup sequence number: %s" %
+ recvd_seq_num)
+ if self.vpp_seq_number < 0xffffffff:
+ if self.sha1_key.auth_type == \
+ BFDAuthType.meticulous_keyed_sha1:
+ self.test.assert_equal(recvd_seq_num,
+ self.vpp_seq_number + 1,
+ "BFD sequence number")
+ else:
+ self.test.assert_in_range(recvd_seq_num,
+ self.vpp_seq_number,
+ self.vpp_seq_number + 1,
+ "BFD sequence number")
+ else:
+ if self.sha1_key.auth_type == \
+ BFDAuthType.meticulous_keyed_sha1:
+ self.test.assert_equal(recvd_seq_num, 0,
+ "BFD sequence number")
+ else:
+ self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
+ "BFD sequence number not one of "
+ "(%s, 0)" % self.vpp_seq_number)
+ self.vpp_seq_number = recvd_seq_num
+ # last 20 bytes represent the hash - so replace them with the key,
+ # pad the result with zeros and hash the result
+ hash_material = bfd.original[:-20] + self.sha1_key.key + \
+ "\0" * (20 - len(self.sha1_key.key))
+ expected_hash = hashlib.sha1(hash_material).hexdigest()
+ self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
+ expected_hash, "Auth key hash")
+
+ def verify_bfd(self, packet):
""" Verify correctness of BFD layer. """
bfd = packet[BFD]
self.test.assert_equal(bfd.version, 1, "BFD version")
self.test.assert_equal(bfd.your_discriminator,
self.bfd_values['my_discriminator'],
"BFD - your discriminator")
+ if self.sha1_key:
+ self.verify_sha1_auth(packet)
class BFDCommonCode:
@@ -122,9 +342,9 @@ class BFDCommonCode:
self.vapi.collect_events() # clear the event queue
if not self.vpp_dead:
self.vapi.want_bfd_events(enable_disable=0)
- self.vpp_session.remove_vpp_config()
def bfd_session_up(self):
+ """ Bring BFD session up """
self.pg_enable_capture([self.pg0])
self.logger.info("BFD: Waiting for slow hello")
p, timeout = self.wait_for_bfd_packet(2)
@@ -139,6 +359,18 @@ class BFDCommonCode:
self.verify_event(e, expected_state=BFDState.up)
self.logger.info("BFD: Session is Up")
self.test_session.update(state=BFDState.up)
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+
+ def bfd_session_down(self):
+ """ Bring BFD session down """
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.test_session.update(state=BFDState.down)
+ self.test_session.send_packet()
+ self.logger.info("BFD: Waiting for event")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ self.verify_event(e, expected_state=BFDState.down)
+ self.logger.info("BFD: Session is Down")
+ self.assert_equal(self.vpp_session.state, BFDState.down, BFDState)
def verify_ip(self, packet):
""" Verify correctness of IP layer. """
@@ -166,12 +398,9 @@ class BFDCommonCode:
""" Verify correctness of event values. """
e = event
self.logger.debug("BFD: Event: %s" % repr(e))
- self.assert_equal(e.bs_index, self.vpp_session.bs_index,
- "BFD session index")
- self.assert_equal(
- e.sw_if_index,
- self.vpp_session.interface.sw_if_index,
- "BFD interface index")
+ self.assert_equal(e.sw_if_index,
+ self.vpp_session.interface.sw_if_index,
+ "BFD interface index")
is_ipv6 = 0
if self.vpp_session.af == AF_INET6:
is_ipv6 = 1
@@ -199,7 +428,7 @@ class BFDCommonCode:
before = time.time()
p = self.pg0.wait_for_packet(timeout=timeout)
after = time.time()
- self.logger.debug(ppp("Got packet:", p))
+ self.logger.debug(ppp("BFD: Got packet:", p))
bfd = p[BFD]
if bfd is None:
raise Exception(ppp("Unexpected or invalid BFD packet:", p))
@@ -207,20 +436,9 @@ class BFDCommonCode:
raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
self.verify_ip(p)
self.verify_udp(p)
- self.test_session.verify_packet(p)
+ self.test_session.verify_bfd(p)
return p, after - before
- def test_session_up(self):
- """ bring BFD session up """
- self.bfd_session_up()
-
- def test_hold_up(self):
- """ hold BFD session up """
- self.bfd_session_up()
- for i in range(5):
- self.wait_for_bfd_packet()
- self.test_session.send_packet()
-
class BFD4TestCase(VppTestCase, BFDCommonCode):
"""Bidirectional Forwarding Detection (BFD)"""
@@ -231,7 +449,6 @@ class BFD4TestCase(VppTestCase, BFDCommonCode):
try:
cls.create_pg_interfaces([0])
cls.pg0.config_ip4()
- cls.pg0.generate_remote_hosts()
cls.pg0.configure_ipv4_neighbors()
cls.pg0.admin_up()
cls.pg0.resolve_arp()
@@ -242,6 +459,7 @@ class BFD4TestCase(VppTestCase, BFDCommonCode):
def setUp(self):
super(BFD4TestCase, self).setUp()
+ self.factory = AuthKeyFactory()
self.vapi.want_bfd_events()
try:
self.vpp_session = VppBFDUDPSession(self, self.pg0,
@@ -255,7 +473,25 @@ class BFD4TestCase(VppTestCase, BFDCommonCode):
def tearDown(self):
BFDCommonCode.tearDown(self)
- super(BFD4TestCase, self).tearDown()
+ VppTestCase.tearDown(self)
+
+ def test_session_up(self):
+ """ bring BFD session up """
+ self.bfd_session_up()
+
+ def test_session_down(self):
+ """ bring BFD session down """
+ self.bfd_session_up()
+ self.bfd_session_down()
+
+ def test_hold_up(self):
+ """ hold BFD session up """
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
def test_slow_timer(self):
""" verify slow periodic control frames while session down """
@@ -367,6 +603,7 @@ class BFD6TestCase(VppTestCase, BFDCommonCode):
def setUp(self):
super(BFD6TestCase, self).setUp()
+ self.factory = AuthKeyFactory()
self.vapi.want_bfd_events()
try:
self.vpp_session = VppBFDUDPSession(self, self.pg0,
@@ -382,7 +619,429 @@ class BFD6TestCase(VppTestCase, BFDCommonCode):
def tearDown(self):
BFDCommonCode.tearDown(self)
- super(BFD6TestCase, self).tearDown()
+ VppTestCase.tearDown(self)
+
+ def test_session_up(self):
+ """ bring BFD session up """
+ self.bfd_session_up()
+
+ def test_hold_up(self):
+ """ hold BFD session up """
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+
+
+class BFDSHA1TestCase(VppTestCase, BFDCommonCode):
+ """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
+
+ @classmethod
+ def setUpClass(cls):
+ super(BFDSHA1TestCase, cls).setUpClass()
+ try:
+ cls.create_pg_interfaces([0])
+ cls.pg0.config_ip4()
+ cls.pg0.admin_up()
+ cls.pg0.resolve_arp()
+
+ except Exception:
+ super(BFDSHA1TestCase, cls).tearDownClass()
+ raise
+
+ def setUp(self):
+ super(BFDSHA1TestCase, self).setUp()
+ self.factory = AuthKeyFactory()
+ self.vapi.want_bfd_events()
+
+ def tearDown(self):
+ BFDCommonCode.tearDown(self)
+ VppTestCase.tearDown(self)
+
+ def test_session_up(self):
+ """ bring BFD session up """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4,
+ sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ self.bfd_session_up()
+
+ def test_hold_up(self):
+ """ hold BFD session up """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4,
+ sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+
+ def test_hold_up_meticulous(self):
+ """ hold BFD session up - meticulous auth """
+ key = self.factory.create_random_key(
+ self, BFDAuthType.meticulous_keyed_sha1)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.inc_seq_num()
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+
+ def test_send_bad_seq_number(self):
+ """ session is not kept alive by msgs with bad seq numbers"""
+ key = self.factory.create_random_key(
+ self, BFDAuthType.meticulous_keyed_sha1)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ self.bfd_session_up()
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ e = self.vapi.collect_events()
+ # session should be down now, because the sequence numbers weren't
+ # updated
+ self.assert_equal(len(e), 1, "number of bfd events")
+ self.verify_event(e[0], expected_state=BFDState.down)
+
+ def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
+ legitimate_test_session,
+ rogue_test_session,
+ rogue_bfd_values=None):
+ """ execute a rogue session interaction scenario
+
+ 1. create vpp session, add config
+ 2. bring the legitimate session up
+ 3. copy the bfd values from legitimate session to rogue session
+ 4. apply rogue_bfd_values to rogue session
+ 5. set rogue session state to down
+ 6. send message to take the session down from the rogue session
+ 7. assert that the legitimate session is unaffected
+ """
+
+ self.vpp_session = vpp_bfd_udp_session
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = legitimate_test_session
+ # bring vpp session up
+ self.bfd_session_up()
+ # send packet from rogue session
+ rogue_test_session.bfd_values = self.test_session.bfd_values.copy()
+ if rogue_bfd_values:
+ rogue_test_session.update(**rogue_bfd_values)
+ rogue_test_session.update(state=BFDState.down)
+ rogue_test_session.send_packet()
+ self.wait_for_bfd_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+
+ def test_mismatch_auth(self):
+ """ session is not brought down by unauthenticated msg """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ vpp_session = VppBFDUDPSession(
+ self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
+ legitimate_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=vpp_session.bfd_key_id)
+ rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
+ self.execute_rogue_session_scenario(vpp_session,
+ legitimate_test_session,
+ rogue_test_session)
+
+ def test_mismatch_bfd_key_id(self):
+ """ session is not brought down by msg with non-existent key-id """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ vpp_session = VppBFDUDPSession(
+ self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
+ # pick a different random bfd key id
+ x = randint(0, 255)
+ while x == vpp_session.bfd_key_id:
+ x = randint(0, 255)
+ legitimate_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=vpp_session.bfd_key_id)
+ rogue_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
+ self.execute_rogue_session_scenario(vpp_session,
+ legitimate_test_session,
+ rogue_test_session)
+
+ def test_mismatched_auth_type(self):
+ """ session is not brought down by msg with wrong auth type """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ vpp_session = VppBFDUDPSession(
+ self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
+ legitimate_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=vpp_session.bfd_key_id)
+ rogue_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=vpp_session.bfd_key_id)
+ self.execute_rogue_session_scenario(
+ vpp_session, legitimate_test_session, rogue_test_session,
+ {'auth_type': BFDAuthType.keyed_md5})
+
+ def test_restart(self):
+ """ simulate remote peer restart and resynchronization """
+ key = self.factory.create_random_key(
+ self, BFDAuthType.meticulous_keyed_sha1)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
+ self.bfd_session_up()
+ # now we need to not respond for 2*detection_time (4 packets)
+ self.wait_for_bfd_packet()
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+ self.wait_for_bfd_packet()
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ self.verify_event(e, expected_state=BFDState.down)
+ self.test_session.update(state=BFDState.down)
+ self.wait_for_bfd_packet()
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+ self.wait_for_bfd_packet()
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+ # reset sequence number
+ self.test_session.our_seq_number = 0
+ self.bfd_session_up()
+
+
+class BFDAuthOnOffTestCase(VppTestCase, BFDCommonCode):
+ """Bidirectional Forwarding Detection (BFD) (changing auth) """
+
+ @classmethod
+ def setUpClass(cls):
+ super(BFDAuthOnOffTestCase, cls).setUpClass()
+ try:
+ cls.create_pg_interfaces([0])
+ cls.pg0.config_ip4()
+ cls.pg0.admin_up()
+ cls.pg0.resolve_arp()
+
+ except Exception:
+ super(BFDAuthOnOffTestCase, cls).tearDownClass()
+ raise
+
+ def setUp(self):
+ super(BFDAuthOnOffTestCase, self).setUp()
+ self.factory = AuthKeyFactory()
+ self.vapi.want_bfd_events()
+
+ def tearDown(self):
+ BFDCommonCode.tearDown(self)
+ VppTestCase.tearDown(self)
+
+ def test_auth_on_immediate(self):
+ """ turn auth on without disturbing session state (immediate) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(self, self.pg0, AF_INET)
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.vpp_session.activate_auth(key)
+ self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
+ self.test_session.sha1_key = key
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_off_immediate(self):
+ """ turn auth off without disturbing session state (immediate) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.vpp_session.deactivate_auth()
+ self.test_session.bfd_key_id = None
+ self.test_session.sha1_key = None
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_change_key_immediate(self):
+ """ change auth key without disturbing session state (immediate) """
+ key1 = self.factory.create_random_key(self)
+ key1.add_vpp_config()
+ key2 = self.factory.create_random_key(self)
+ key2.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key1)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key1,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.vpp_session.activate_auth(key2)
+ self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
+ self.test_session.sha1_key = key2
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_on_delayed(self):
+ """ turn auth on without disturbing session state (delayed) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(self, self.pg0, AF_INET)
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.vpp_session.activate_auth(key, delayed=True)
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
+ self.test_session.sha1_key = key
+ self.test_session.send_packet()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_off_delayed(self):
+ """ turn auth off without disturbing session state (delayed) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.vpp_session.deactivate_auth(delayed=True)
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.test_session.bfd_key_id = None
+ self.test_session.sha1_key = None
+ self.test_session.send_packet()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_change_key_delayed(self):
+ """ change auth key without disturbing session state (delayed) """
+ key1 = self.factory.create_random_key(self)
+ key1.add_vpp_config()
+ key2 = self.factory.create_random_key(self)
+ key2.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key1)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key1,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ self.bfd_session_up()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.vpp_session.activate_auth(key2, delayed=True)
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
+ self.test_session.sha1_key = key2
+ self.test_session.send_packet()
+ for i in range(5):
+ self.wait_for_bfd_packet()
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
diff --git a/test/vpp_papi_provider.py b/test/vpp_papi_provider.py
index 90c954dc..72c18e6c 100644
--- a/test/vpp_papi_provider.py
+++ b/test/vpp_papi_provider.py
@@ -992,16 +992,57 @@ class VppPapiProvider(object):
self.api(self.papi.control_ping)
def bfd_udp_add(self, sw_if_index, desired_min_tx, required_min_rx,
- detect_mult, local_addr, peer_addr, is_ipv6=0):
- return self.api(self.papi.bfd_udp_add,
+ detect_mult, local_addr, peer_addr, is_ipv6=0,
+ bfd_key_id=None, conf_key_id=None):
+ if bfd_key_id is None:
+ return self.api(self.papi.bfd_udp_add,
+ {
+ 'sw_if_index': sw_if_index,
+ 'desired_min_tx': desired_min_tx,
+ 'required_min_rx': required_min_rx,
+ 'local_addr': local_addr,
+ 'peer_addr': peer_addr,
+ 'is_ipv6': is_ipv6,
+ 'detect_mult': detect_mult,
+ })
+ else:
+ return self.api(self.papi.bfd_udp_add,
+ {
+ 'sw_if_index': sw_if_index,
+ 'desired_min_tx': desired_min_tx,
+ 'required_min_rx': required_min_rx,
+ 'local_addr': local_addr,
+ 'peer_addr': peer_addr,
+ 'is_ipv6': is_ipv6,
+ 'detect_mult': detect_mult,
+ 'is_authenticated': 1,
+ 'bfd_key_id': bfd_key_id,
+ 'conf_key_id': conf_key_id,
+ })
+
+ def bfd_udp_auth_activate(self, sw_if_index, local_addr, peer_addr,
+ is_ipv6=0, bfd_key_id=None, conf_key_id=None,
+ is_delayed=False):
+ return self.api(self.papi.bfd_udp_auth_activate,
{
'sw_if_index': sw_if_index,
- 'desired_min_tx': desired_min_tx,
- 'required_min_rx': required_min_rx,
'local_addr': local_addr,
'peer_addr': peer_addr,
'is_ipv6': is_ipv6,
- 'detect_mult': detect_mult,
+ 'is_delayed': 1 if is_delayed else 0,
+ 'bfd_key_id': bfd_key_id,
+ 'conf_key_id': conf_key_id,
+ })
+
+ def bfd_udp_auth_deactivate(self, sw_if_index, local_addr, peer_addr,
+ is_ipv6=0, is_delayed=False):
+ return self.api(self.papi.bfd_udp_auth_deactivate,
+ {
+ 'sw_if_index': sw_if_index,
+ 'local_addr': local_addr,
+ 'peer_addr': peer_addr,
+ 'is_ipv6': is_ipv6,
+ 'is_delayed': 1 if is_delayed else 0,
})
def bfd_udp_del(self, sw_if_index, local_addr, peer_addr, is_ipv6=0):
@@ -1016,10 +1057,14 @@ class VppPapiProvider(object):
def bfd_udp_session_dump(self):
return self.api(self.papi.bfd_udp_session_dump, {})
- def bfd_session_set_flags(self, bs_idx, admin_up_down):
- return self.api(self.papi.bfd_session_set_flags, {
- 'bs_index': bs_idx,
+ def bfd_udp_session_set_flags(self, admin_up_down, sw_if_index, local_addr,
+ peer_addr, is_ipv6=0):
+ return self.api(self.papi.bfd_udp_session_set_flags, {
'admin_up_down': admin_up_down,
+ 'sw_if_index': sw_if_index,
+ 'local_addr': local_addr,
+ 'peer_addr': peer_addr,
+ 'is_ipv6': is_ipv6,
})
def want_bfd_events(self, enable_disable=1):
@@ -1028,6 +1073,22 @@ class VppPapiProvider(object):
'pid': os.getpid(),
})
+ def bfd_auth_set_key(self, conf_key_id, auth_type, key):
+ return self.api(self.papi.bfd_auth_set_key, {
+ 'conf_key_id': conf_key_id,
+ 'auth_type': auth_type,
+ 'key': key,
+ 'key_len': len(key),
+ })
+
+ def bfd_auth_del_key(self, conf_key_id):
+ return self.api(self.papi.bfd_auth_del_key, {
+ 'conf_key_id': conf_key_id,
+ })
+
+ def bfd_auth_keys_dump(self):
+ return self.api(self.papi.bfd_auth_keys_dump, {})
+
def classify_add_del_table(
self,
is_add,