aboutsummaryrefslogtreecommitdiffstats
path: root/test/bfd.py
diff options
context:
space:
mode:
authorKlement Sekera <ksekera@cisco.com>2017-01-09 07:43:48 +0100
committerDamjan Marion <dmarion.lists@gmail.com>2017-02-02 12:04:30 +0000
commitb17dd9607ee8ecba5ae3ef69c7b4915b57de292a (patch)
treeaa8d22ac4a9ea3adc2f96c4d754273af505950d5 /test/bfd.py
parent402ed3128512efc091a560729ce1e772a86e9f74 (diff)
BFD: SHA1 authentication
Add authentication support to BFD feature. Out of three existing authentication types, implement SHA1 (sole RFC requirement). Simple password is insecure and MD5 is discouraged by the RFC, so ignore those. Add/change APIs to allow configuring BFD authentication keys and their usage with BFD sessions. Change-Id: Ifb0fb5b19c2e72196d84c1cde919bd4c074ea415 Signed-off-by: Klement Sekera <ksekera@cisco.com>
Diffstat (limited to 'test/bfd.py')
-rw-r--r--test/bfd.py265
1 files changed, 206 insertions, 59 deletions
diff --git a/test/bfd.py b/test/bfd.py
index 5171681369c..475a1707b40 100644
--- a/test/bfd.py
+++ b/test/bfd.py
@@ -1,3 +1,4 @@
+from random import randint
from socket import AF_INET, AF_INET6
from scapy.all import *
from scapy.packet import *
@@ -53,11 +54,57 @@ class BFDState(NumericConstant):
NumericConstant.__init__(self, value)
+class BFDAuthType(NumericConstant):
+ """ BFD Authentication Type """
+ no_auth = 0
+ simple_pwd = 1
+ keyed_md5 = 2
+ meticulous_keyed_md5 = 3
+ keyed_sha1 = 4
+ meticulous_keyed_sha1 = 5
+
+ desc_dict = {
+ no_auth: "No authentication",
+ simple_pwd: "Simple Password",
+ keyed_md5: "Keyed MD5",
+ meticulous_keyed_md5: "Meticulous Keyed MD5",
+ keyed_sha1: "Keyed SHA1",
+ meticulous_keyed_sha1: "Meticulous Keyed SHA1",
+ }
+
+ def __init__(self, value):
+ NumericConstant.__init__(self, value)
+
+
+def bfd_is_auth_used(pkt):
+ return "A" in pkt.sprintf("%BFD.flags%")
+
+
+def bfd_is_simple_pwd_used(pkt):
+ return bfd_is_auth_used(pkt) and pkt.auth_type == BFDAuthType.simple_pwd
+
+
+def bfd_is_sha1_used(pkt):
+ return bfd_is_auth_used(pkt) and pkt.auth_type in \
+ (BFDAuthType.keyed_sha1, BFDAuthType.meticulous_keyed_sha1)
+
+
+def bfd_is_md5_used(pkt):
+ return bfd_is_auth_used(pkt) and pkt.auth_type in \
+ (BFDAuthType.keyed_md5, BFDAuthType.meticulous_keyed_md5)
+
+
+def bfd_is_md5_or_sha1_used(pkt):
+ return bfd_is_md5_used(pkt) or bfd_is_sha1_used(pkt)
+
+
class BFD(Packet):
udp_dport = 3784 #: BFD destination port per RFC 5881
udp_sport_min = 49152 #: BFD source port min value per RFC 5881
udp_sport_max = 65535 #: BFD source port max value per RFC 5881
+ bfd_pkt_len = 24 # : length of BFD pkt without authentication section
+ sha1_auth_len = 28 # : length of authentication section if SHA1 used
name = "BFD"
@@ -65,14 +112,27 @@ class BFD(Packet):
BitField("version", 1, 3),
BitEnumField("diag", 0, 5, BFDDiagCode.desc_dict),
BitEnumField("state", 0, 2, BFDState.desc_dict),
- FlagsField("flags", 0, 6, ['P', 'F', 'C', 'A', 'D', 'M']),
+ FlagsField("flags", 0, 6, ['M', 'D', 'A', 'C', 'F', 'P']),
XByteField("detect_mult", 0),
- XByteField("length", 24),
+ BitField("length", bfd_pkt_len, 8),
BitField("my_discriminator", 0, 32),
BitField("your_discriminator", 0, 32),
BitField("desired_min_tx_interval", 0, 32),
BitField("required_min_rx_interval", 0, 32),
- BitField("required_min_echo_rx_interval", 0, 32)]
+ BitField("required_min_echo_rx_interval", 0, 32),
+ ConditionalField(
+ BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict),
+ bfd_is_auth_used),
+ ConditionalField(BitField("auth_len", 0, 8), bfd_is_auth_used),
+ ConditionalField(BitField("auth_key_id", 0, 8), bfd_is_auth_used),
+ ConditionalField(BitField("auth_reserved", 0, 8),
+ bfd_is_md5_or_sha1_used),
+ ConditionalField(
+ BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used),
+ ConditionalField(StrField("auth_key_hash", "0" * 16), bfd_is_md5_used),
+ ConditionalField(
+ StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used),
+ ]
def mysummary(self):
return self.sprintf("BFD(my_disc=%BFD.my_discriminator%,"
@@ -82,9 +142,78 @@ class BFD(Packet):
bind_layers(UDP, BFD, dport=BFD.udp_dport)
+class VppBFDAuthKey(VppObject):
+ """ Represents BFD authentication key in VPP """
+
+ def __init__(self, test, conf_key_id, auth_type, key):
+ self._test = test
+ self._key = key
+ self._auth_type = auth_type
+ test.assertIn(auth_type, BFDAuthType.desc_dict)
+ self._conf_key_id = conf_key_id
+
+ @property
+ def test(self):
+ """ Test which created this key """
+ return self._test
+
+ @property
+ def auth_type(self):
+ """ Authentication type for this key """
+ return self._auth_type
+
+ @property
+ def key(self):
+ return self._key
+
+ @property
+ def conf_key_id(self):
+ return self._conf_key_id
+
+ def add_vpp_config(self):
+ self.test.vapi.bfd_auth_set_key(
+ self._conf_key_id, self._auth_type, self._key)
+ self._test.registry.register(self, self.test.logger)
+
+ def get_bfd_auth_keys_dump_entry(self):
+ """ get the entry in the auth keys dump corresponding to this key """
+ result = self.test.vapi.bfd_auth_keys_dump()
+ for k in result:
+ if k.conf_key_id == self._conf_key_id:
+ return k
+ return None
+
+ def query_vpp_config(self):
+ return self.get_bfd_auth_keys_dump_entry() is not None
+
+ def remove_vpp_config(self):
+ self.test.vapi.bfd_auth_del_key(self._conf_key_id)
+
+ def object_id(self):
+ return "bfd-auth-key-%s" % self._conf_key_id
+
+ def __str__(self):
+ return self.object_id()
+
+
class VppBFDUDPSession(VppObject):
""" Represents BFD UDP session in VPP """
+ def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET,
+ desired_min_tx=100000, required_min_rx=100000, detect_mult=3,
+ sha1_key=None, bfd_key_id=None):
+ self._test = test
+ self._interface = interface
+ self._af = af
+ self._local_addr = local_addr
+ self._peer_addr = peer_addr
+ self._peer_addr_n = socket.inet_pton(af, peer_addr)
+ self._desired_min_tx = desired_min_tx
+ self._required_min_rx = required_min_rx
+ self._detect_mult = detect_mult
+ self._sha1_key = sha1_key
+ self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255)
+
@property
def test(self):
""" Test which created this session """
@@ -101,13 +230,6 @@ class VppBFDUDPSession(VppObject):
return self._af
@property
- def bs_index(self):
- """ BFD session index from VPP """
- if self._bs_index is not None:
- return self._bs_index
- raise NotConfiguredException("not configured")
-
- @property
def local_addr(self):
""" BFD session local address (VPP address) """
if self._local_addr is None:
@@ -141,19 +263,27 @@ class VppBFDUDPSession(VppObject):
""" BFD session peer address - raw, suitable for API """
return self._peer_addr_n
- @property
- def state(self):
- """ BFD session state """
+ def get_bfd_udp_session_dump_entry(self):
result = self.test.vapi.bfd_udp_session_dump()
- session = None
for s in result:
+ self.test.logger.debug("session entry: %s" % str(s))
if s.sw_if_index == self.interface.sw_if_index:
if self.af == AF_INET \
and s.is_ipv6 == 0 \
and self.interface.local_ip4n == s.local_addr[:4] \
and self.interface.remote_ip4n == s.peer_addr[:4]:
- session = s
- break
+ return s
+ if self.af == AF_INET6 \
+ and s.is_ipv6 == 1 \
+ and self.interface.local_ip6n == s.local_addr \
+ and self.interface.remote_ip6n == s.peer_addr:
+ return s
+ return None
+
+ @property
+ def state(self):
+ """ BFD session state """
+ session = self.get_bfd_udp_session_dump_entry()
if session is None:
raise Exception("Could not find BFD session in VPP response: %s" %
repr(result))
@@ -171,61 +301,78 @@ class VppBFDUDPSession(VppObject):
def detect_mult(self):
return self._detect_mult
- def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET,
- desired_min_tx=100000, required_min_rx=100000, detect_mult=3):
- self._test = test
- self._interface = interface
- self._af = af
- self._local_addr = local_addr
- self._peer_addr = peer_addr
- self._peer_addr_n = socket.inet_pton(af, peer_addr)
- self._bs_index = None
- self._desired_min_tx = desired_min_tx
- self._required_min_rx = required_min_rx
- self._detect_mult = detect_mult
+ @property
+ def sha1_key(self):
+ return self._sha1_key
+
+ @property
+ def bfd_key_id(self):
+ return self._bfd_key_id
+
+ def activate_auth(self, key, bfd_key_id=None, delayed=False):
+ self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255)
+ self._sha1_key = key
+ is_ipv6 = 1 if AF_INET6 == self.af else 0
+ conf_key_id = self._sha1_key.conf_key_id
+ is_delayed = 1 if delayed else 0
+ self.test.vapi.bfd_udp_auth_activate(self._interface.sw_if_index,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6,
+ bfd_key_id=self._bfd_key_id,
+ conf_key_id=conf_key_id,
+ is_delayed=is_delayed)
+
+ def deactivate_auth(self, delayed=False):
+ self._bfd_key_id = None
+ self._sha1_key = None
+ is_delayed = 1 if delayed else 0
+ is_ipv6 = 1 if AF_INET6 == self.af else 0
+ self.test.vapi.bfd_udp_auth_deactivate(self._interface.sw_if_index,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6,
+ is_delayed=is_delayed)
def add_vpp_config(self):
is_ipv6 = 1 if AF_INET6 == self.af else 0
- result = self.test.vapi.bfd_udp_add(
- self._interface.sw_if_index,
- self.desired_min_tx,
- self.required_min_rx,
- self.detect_mult,
- self.local_addr_n,
- self.peer_addr_n,
- is_ipv6=is_ipv6)
- self._bs_index = result.bs_index
+ bfd_key_id = self._bfd_key_id if self._sha1_key else None
+ conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None
+ self.test.vapi.bfd_udp_add(self._interface.sw_if_index,
+ self.desired_min_tx,
+ self.required_min_rx,
+ self.detect_mult,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6,
+ bfd_key_id=bfd_key_id,
+ conf_key_id=conf_key_id)
self._test.registry.register(self, self.test.logger)
def query_vpp_config(self):
- result = self.test.vapi.bfd_udp_session_dump()
- session = None
- for s in result:
- if s.sw_if_index == self.interface.sw_if_index:
- if self.af == AF_INET \
- and s.is_ipv6 == 0 \
- and self.interface.local_ip4n == s.local_addr[:4] \
- and self.interface.remote_ip4n == s.peer_addr[:4]:
- session = s
- break
- if session is None:
- return False
- return True
+ session = self.get_bfd_udp_session_dump_entry()
+ return session is not None
def remove_vpp_config(self):
- if self._bs_index is not None:
- is_ipv6 = 1 if AF_INET6 == self._af else 0
- self.test.vapi.bfd_udp_del(
- self._interface.sw_if_index,
- self.local_addr_n,
- self.peer_addr_n,
- is_ipv6=is_ipv6)
+ is_ipv6 = 1 if AF_INET6 == self._af else 0
+ self.test.vapi.bfd_udp_del(self._interface.sw_if_index,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6)
def object_id(self):
- return "bfd-udp-%d" % self.bs_index
+ return "bfd-udp-%s-%s-%s-%s" % (self._interface.sw_if_index,
+ self.local_addr,
+ self.peer_addr,
+ self.af)
def __str__(self):
return self.object_id()
def admin_up(self):
- self.test.vapi.bfd_session_set_flags(self.bs_index, 1)
+ is_ipv6 = 1 if AF_INET6 == self._af else 0
+ self.test.vapi.bfd_udp_session_set_flags(1,
+ self._interface.sw_if_index,
+ self.local_addr_n,
+ self.peer_addr_n,
+ is_ipv6=is_ipv6)