summaryrefslogtreecommitdiffstats
path: root/test/test_bfd.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_bfd.py')
-rw-r--r--test/test_bfd.py122
1 files changed, 103 insertions, 19 deletions
diff --git a/test/test_bfd.py b/test/test_bfd.py
index 0ba0b46dee8..64e9301ac5f 100644
--- a/test/test_bfd.py
+++ b/test/test_bfd.py
@@ -8,6 +8,7 @@ import binascii
import time
from random import randint, shuffle
from socket import AF_INET, AF_INET6
+from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
@@ -836,7 +837,6 @@ class BFD4TestCase(VppTestCase):
def test_no_periodic_if_remote_demand(self):
""" no periodic frames outside poll sequence if remote demand set """
- self.test_session.update(detect_mult=10)
bfd_session_up(self)
demand = self.test_session.create_packet()
demand[BFD].flags = "D"
@@ -846,7 +846,7 @@ class BFD4TestCase(VppTestCase):
self.test_session.desired_min_tx) \
/ USEC_IN_SEC
count = 0
- for dummy in range(self.test_session.detect_mult):
+ for dummy in range(self.test_session.detect_mult * 2):
time.sleep(transmit_time)
self.test_session.send_packet(demand)
try:
@@ -861,6 +861,48 @@ class BFD4TestCase(VppTestCase):
self.assert_equal(count, 0, "number of packets received")
self.assert_equal(len(events), 0, "number of events received")
+ def test_echo_looped_back(self):
+ """ echo packets looped back """
+ # don't need a session in this case..
+ self.vpp_session.remove_vpp_config()
+ self.pg0.enable_capture()
+ echo_packet_count = 10
+ # random source port low enough to increment a few times..
+ udp_sport_tx = randint(1, 50000)
+ udp_sport_rx = udp_sport_tx
+ echo_packet = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg0.local_ip4) /
+ UDP(dport=BFD.udp_dport_echo) /
+ Raw("this should be looped back"))
+ for dummy in range(echo_packet_count):
+ self.sleep(.01, "delay between echo packets")
+ echo_packet[UDP].sport = udp_sport_tx
+ udp_sport_tx += 1
+ self.logger.debug(ppp("Sending packet:", echo_packet))
+ self.pg0.add_stream(echo_packet)
+ self.pg_start()
+ for dummy in range(echo_packet_count):
+ p = self.pg0.wait_for_packet(1)
+ self.logger.debug(ppp("Got packet:", p))
+ ether = p[Ether]
+ self.assert_equal(self.pg0.remote_mac,
+ ether.dst, "Destination MAC")
+ self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
+ ip = p[IP]
+ self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
+ self.assert_equal(self.pg0.local_ip4, ip.src, "Destination IP")
+ udp = p[UDP]
+ self.assert_equal(udp.dport, BFD.udp_dport_echo,
+ "UDP destination port")
+ self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
+ udp_sport_rx += 1
+ self.assertTrue(p.haslayer(Raw) and p[Raw] == echo_packet[Raw],
+ "Received packet is not the echo packet sent")
+ self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
+ "ECHO packet identifier for test purposes)")
+
class BFD6TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (IPv6) """
@@ -914,13 +956,55 @@ class BFD6TestCase(VppTestCase):
def test_hold_up(self):
""" hold BFD session up """
bfd_session_up(self)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
wait_for_bfd_packet(self)
self.test_session.send_packet()
self.assert_equal(len(self.vapi.collect_events()), 0,
"number of bfd events")
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ def test_echo_looped_back(self):
+ """ echo packets looped back """
+ # don't need a session in this case..
+ self.vpp_session.remove_vpp_config()
+ self.pg0.enable_capture()
+ echo_packet_count = 10
+ # random source port low enough to increment a few times..
+ udp_sport_tx = randint(1, 50000)
+ udp_sport_rx = udp_sport_tx
+ echo_packet = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6,
+ dst=self.pg0.local_ip6) /
+ UDP(dport=BFD.udp_dport_echo) /
+ Raw("this should be looped back"))
+ for dummy in range(echo_packet_count):
+ self.sleep(.01, "delay between echo packets")
+ echo_packet[UDP].sport = udp_sport_tx
+ udp_sport_tx += 1
+ self.logger.debug(ppp("Sending packet:", echo_packet))
+ self.pg0.add_stream(echo_packet)
+ self.pg_start()
+ for dummy in range(echo_packet_count):
+ p = self.pg0.wait_for_packet(1)
+ self.logger.debug(ppp("Got packet:", p))
+ ether = p[Ether]
+ self.assert_equal(self.pg0.remote_mac,
+ ether.dst, "Destination MAC")
+ self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
+ ip = p[IPv6]
+ self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
+ self.assert_equal(self.pg0.local_ip6, ip.src, "Destination IP")
+ udp = p[UDP]
+ self.assert_equal(udp.dport, BFD.udp_dport_echo,
+ "UDP destination port")
+ self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
+ udp_sport_rx += 1
+ self.assertTrue(p.haslayer(Raw) and p[Raw] == echo_packet[Raw],
+ "Received packet is not the echo packet sent")
+ self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
+ "ECHO packet identifier for test purposes)")
+
class BFDSHA1TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
@@ -982,7 +1066,7 @@ class BFDSHA1TestCase(VppTestCase):
self, self.pg0, AF_INET, sha1_key=key,
bfd_key_id=self.vpp_session.bfd_key_id)
bfd_session_up(self)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
wait_for_bfd_packet(self)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
@@ -1195,14 +1279,14 @@ class BFDAuthOnOffTestCase(VppTestCase):
self.vpp_session.admin_up()
self.test_session = BFDTestSession(self, self.pg0, AF_INET)
bfd_session_up(self)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
self.vpp_session.activate_auth(key)
self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
self.test_session.sha1_key = key
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
@@ -1223,7 +1307,7 @@ class BFDAuthOnOffTestCase(VppTestCase):
bfd_key_id=self.vpp_session.bfd_key_id)
bfd_session_up(self)
# self.vapi.want_bfd_events(enable_disable=0)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.inc_seq_num()
@@ -1231,7 +1315,7 @@ class BFDAuthOnOffTestCase(VppTestCase):
self.vpp_session.deactivate_auth()
self.test_session.bfd_key_id = None
self.test_session.sha1_key = None
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.inc_seq_num()
@@ -1254,14 +1338,14 @@ class BFDAuthOnOffTestCase(VppTestCase):
self, self.pg0, AF_INET, sha1_key=key1,
bfd_key_id=self.vpp_session.bfd_key_id)
bfd_session_up(self)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
self.vpp_session.activate_auth(key2)
self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
self.test_session.sha1_key = key2
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
@@ -1279,18 +1363,18 @@ class BFDAuthOnOffTestCase(VppTestCase):
self.vpp_session.admin_up()
self.test_session = BFDTestSession(self, self.pg0, AF_INET)
bfd_session_up(self)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
wait_for_bfd_packet(self)
self.test_session.send_packet()
self.vpp_session.activate_auth(key, delayed=True)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
self.test_session.sha1_key = key
self.test_session.send_packet()
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
@@ -1310,19 +1394,19 @@ class BFDAuthOnOffTestCase(VppTestCase):
self, self.pg0, AF_INET, sha1_key=key,
bfd_key_id=self.vpp_session.bfd_key_id)
bfd_session_up(self)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
self.vpp_session.deactivate_auth(delayed=True)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
self.test_session.bfd_key_id = None
self.test_session.sha1_key = None
self.test_session.send_packet()
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
@@ -1344,19 +1428,19 @@ class BFDAuthOnOffTestCase(VppTestCase):
self, self.pg0, AF_INET, sha1_key=key1,
bfd_key_id=self.vpp_session.bfd_key_id)
bfd_session_up(self)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
self.vpp_session.activate_auth(key2, delayed=True)
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()
self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
self.test_session.sha1_key = key2
self.test_session.send_packet()
- for dummy in range(self.test_session.detect_mult*2):
+ for dummy in range(self.test_session.detect_mult * 2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
self.test_session.send_packet()