aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_punt.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_punt.py')
-rw-r--r--[-rwxr-xr-x]test/test_punt.py676
1 files changed, 366 insertions, 310 deletions
diff --git a/test/test_punt.py b/test/test_punt.py
index 3471a3f7dd5..8ee43f1ed73 100755..100644
--- a/test/test_punt.py
+++ b/test/test_punt.py
@@ -1,33 +1,29 @@
#!/usr/bin/env python3
-import binascii
import random
import socket
import os
import threading
-import struct
import copy
import fcntl
import time
-
-from struct import unpack, unpack_from
+import errno
try:
import unittest2 as unittest
except ImportError:
import unittest
-from util import ppp, ppc
-from re import compile
-import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
+from scapy.layers.l2 import Dot1Q
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.ipsec import ESP
import scapy.layers.inet6 as inet6
-from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
+from scapy.layers.inet6 import IPv6
from scapy.contrib.ospf import OSPF_Hdr, OSPFv3_Hello
-from framework import tag_fixme_vpp_workers
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner, tag_fixme_vpp_workers
+from vpp_sub_interface import VppDot1QSubint
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
@@ -38,7 +34,7 @@ NUM_PKTS = 67
class serverSocketThread(threading.Thread):
- """ Socket server thread"""
+ """Socket server thread"""
def __init__(self, threadID, sockName):
threading.Thread.__init__(self)
@@ -60,7 +56,7 @@ class serverSocketThread(threading.Thread):
# Ethernet
self.rx_pkts.append(Ether(data[8:]))
except IOError as e:
- if e.errno == 11:
+ if e.errno == errno.EAGAIN:
# nothing to receive, stop running or sleep a little
if self.stop_running:
break
@@ -90,7 +86,7 @@ class serverSocketThread(threading.Thread):
class TestPuntSocket(VppTestCase):
- """ Punt Socket """
+ """Punt Socket"""
ports = [1111, 2222, 3333, 4444]
sock_servers = list()
@@ -108,8 +104,13 @@ class TestPuntSocket(VppTestCase):
@classmethod
def setUpConstants(cls):
- cls.extra_vpp_punt_config = [
- "punt", "{", "socket", cls.tempdir+"/socket_punt", "}"]
+ cls.extra_vpp_config = [
+ "punt",
+ "{",
+ "socket",
+ cls.tempdir + "/socket_punt",
+ "}",
+ ]
super(TestPuntSocket, cls).setUpConstants()
def setUp(self):
@@ -137,25 +138,21 @@ class TestPuntSocket(VppTestCase):
return rx_pkts
def verify_port(self, pr, vpr):
- self.assertEqual(vpr.punt.type, pr['type'])
- self.assertEqual(vpr.punt.punt.l4.port,
- pr['punt']['l4']['port'])
- self.assertEqual(vpr.punt.punt.l4.protocol,
- pr['punt']['l4']['protocol'])
- self.assertEqual(vpr.punt.punt.l4.af,
- pr['punt']['l4']['af'])
+ self.assertEqual(vpr.punt.type, pr["type"])
+ self.assertEqual(vpr.punt.punt.l4.port, pr["punt"]["l4"]["port"])
+ self.assertEqual(vpr.punt.punt.l4.protocol, pr["punt"]["l4"]["protocol"])
+ self.assertEqual(vpr.punt.punt.l4.af, pr["punt"]["l4"]["af"])
def verify_exception(self, pr, vpr):
- self.assertEqual(vpr.punt.type, pr['type'])
- self.assertEqual(vpr.punt.punt.exception.id,
- pr['punt']['exception']['id'])
+ self.assertEqual(vpr.punt.type, pr["type"])
+ self.assertEqual(vpr.punt.punt.exception.id, pr["punt"]["exception"]["id"])
def verify_ip_proto(self, pr, vpr):
- self.assertEqual(vpr.punt.type, pr['type'])
- self.assertEqual(vpr.punt.punt.ip_proto.af,
- pr['punt']['ip_proto']['af'])
- self.assertEqual(vpr.punt.punt.ip_proto.protocol,
- pr['punt']['ip_proto']['protocol'])
+ self.assertEqual(vpr.punt.type, pr["type"])
+ self.assertEqual(vpr.punt.punt.ip_proto.af, pr["punt"]["ip_proto"]["af"])
+ self.assertEqual(
+ vpr.punt.punt.ip_proto.protocol, pr["punt"]["ip_proto"]["protocol"]
+ )
def verify_udp_pkts(self, rxs, n_rx, port):
n_match = 0
@@ -167,12 +164,12 @@ class TestPuntSocket(VppTestCase):
def set_port(pr, port):
- pr['punt']['l4']['port'] = port
+ pr["punt"]["l4"]["port"] = port
return pr
def set_reason(pr, reason):
- pr['punt']['exception']['id'] = reason
+ pr["punt"]["exception"]["id"] = reason
return pr
@@ -180,15 +177,7 @@ def mk_vpp_cfg4():
pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
- punt_l4 = {
- 'type': pt_l4,
- 'punt': {
- 'l4': {
- 'af': af_ip4,
- 'protocol': udp_proto
- }
- }
- }
+ punt_l4 = {"type": pt_l4, "punt": {"l4": {"af": af_ip4, "protocol": udp_proto}}}
return punt_l4
@@ -196,20 +185,12 @@ def mk_vpp_cfg6():
pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
- punt_l4 = {
- 'type': pt_l4,
- 'punt': {
- 'l4': {
- 'af': af_ip6,
- 'protocol': udp_proto
- }
- }
- }
+ punt_l4 = {"type": pt_l4, "punt": {"l4": {"af": af_ip6, "protocol": udp_proto}}}
return punt_l4
class TestIP4PuntSocket(TestPuntSocket):
- """ Punt Socket for IPv4 UDP """
+ """Punt Socket for IPv4 UDP"""
@classmethod
def setUpClass(cls):
@@ -233,7 +214,7 @@ class TestIP4PuntSocket(TestPuntSocket):
i.admin_down()
def test_punt_socket_dump(self):
- """ Punt socket registration/deregistration"""
+ """Punt socket registration/deregistration"""
pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
@@ -247,10 +228,12 @@ class TestIP4PuntSocket(TestPuntSocket):
#
punt_l4 = mk_vpp_cfg4()
- self.vapi.punt_socket_register(set_port(punt_l4, 1111),
- "%s/socket_punt_1111" % self.tempdir)
- self.vapi.punt_socket_register(set_port(punt_l4, 2222),
- "%s/socket_punt_2222" % self.tempdir)
+ self.vapi.punt_socket_register(
+ set_port(punt_l4, 1111), "%s/socket_punt_1111" % self.tempdir
+ )
+ self.vapi.punt_socket_register(
+ set_port(punt_l4, 2222), "%s/socket_punt_2222" % self.tempdir
+ )
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), 2)
self.verify_port(set_port(punt_l4, 1111), punts[0])
@@ -266,10 +249,12 @@ class TestIP4PuntSocket(TestPuntSocket):
#
# configure a punt socket again
#
- self.vapi.punt_socket_register(set_port(punt_l4, 1111),
- "%s/socket_punt_1111" % self.tempdir)
- self.vapi.punt_socket_register(set_port(punt_l4, 3333),
- "%s/socket_punt_3333" % self.tempdir)
+ self.vapi.punt_socket_register(
+ set_port(punt_l4, 1111), "%s/socket_punt_1111" % self.tempdir
+ )
+ self.vapi.punt_socket_register(
+ set_port(punt_l4, 3333), "%s/socket_punt_3333" % self.tempdir
+ )
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), 3)
@@ -285,17 +270,18 @@ class TestIP4PuntSocket(TestPuntSocket):
self.assertEqual(len(punts), 0)
def test_punt_socket_traffic_single_port_single_socket(self):
- """ Punt socket traffic single port single socket"""
+ """Punt socket traffic single port single socket"""
port = self.ports[0]
pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
punt_l4 = set_port(mk_vpp_cfg4(), port)
- p = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
- UDP(sport=9876, dport=port) /
- Raw(b'\xa5' * 100))
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / UDP(sport=9876, dport=port)
+ / Raw(b"\xa5" * 100)
+ )
pkts = p * self.nr_packets
@@ -305,18 +291,17 @@ class TestIP4PuntSocket(TestPuntSocket):
#
# expect ICMP - port unreachable for all packets
#
- rx = self.send_and_expect(self.pg0, pkts, self.pg0)
+ rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
for p in rx:
- self.assertEqual(int(p[IP].proto), 1) # ICMP
+ self.assertEqual(int(p[IP].proto), 1) # ICMP
self.assertEqual(int(p[ICMP].code), 3) # unreachable
#
# configure a punt socket
#
self.socket_client_create("%s/socket_%d" % (self.tempdir, port))
- self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" %
- (self.tempdir, port))
+ self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" % (self.tempdir, port))
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), 1)
@@ -334,13 +319,13 @@ class TestIP4PuntSocket(TestPuntSocket):
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), 0)
- rx = self.send_and_expect(self.pg0, pkts, self.pg0)
+ rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
for p in rx:
- self.assertEqual(int(p[IP].proto), 1) # ICMP
+ self.assertEqual(int(p[IP].proto), 1) # ICMP
self.assertEqual(int(p[ICMP].code), 3) # unreachable
def test_punt_socket_traffic_multi_ports_multi_sockets(self):
- """ Punt socket traffic multi ports and multi sockets"""
+ """Punt socket traffic multi ports and multi sockets"""
punt_l4 = mk_vpp_cfg4()
@@ -354,38 +339,40 @@ class TestIP4PuntSocket(TestPuntSocket):
# choose port from port list
cfgs[port] = {}
- pkt = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
- UDP(sport=9876, dport=port) /
- Raw(b'\xa5' * 100))
- cfgs[port]['pkts'] = pkt * self.nr_packets
- cfgs[port]['port'] = port
- cfgs[port]['vpp'] = copy.deepcopy(set_port(punt_l4, port))
+ pkt = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / UDP(sport=9876, dport=port)
+ / Raw(b"\xa5" * 100)
+ )
+ cfgs[port]["pkts"] = pkt * self.nr_packets
+ cfgs[port]["port"] = port
+ cfgs[port]["vpp"] = copy.deepcopy(set_port(punt_l4, port))
# configure punt sockets
- cfgs[port]['sock'] = self.socket_client_create(
- "%s/socket_%d" % (self.tempdir, port))
+ cfgs[port]["sock"] = self.socket_client_create(
+ "%s/socket_%d" % (self.tempdir, port)
+ )
self.vapi.punt_socket_register(
- cfgs[port]['vpp'],
- "%s/socket_%d" % (self.tempdir, port))
+ cfgs[port]["vpp"], "%s/socket_%d" % (self.tempdir, port)
+ )
#
# send the packets that get punted
#
for cfg in cfgs.values():
- self.send_and_assert_no_replies(self.pg0, cfg['pkts'])
+ self.send_and_assert_no_replies(self.pg0, cfg["pkts"])
#
# test that we got the excepted packets on the expected socket
#
for cfg in cfgs.values():
- rx = cfg['sock'].close()
- self.verify_udp_pkts(rx, len(cfg['pkts']), cfg['port'])
- self.vapi.punt_socket_deregister(cfg['vpp'])
+ rx = cfg["sock"].close()
+ self.verify_udp_pkts(rx, len(cfg["pkts"]), cfg["port"])
+ self.vapi.punt_socket_deregister(cfg["vpp"])
def test_punt_socket_traffic_multi_ports_single_socket(self):
- """ Punt socket traffic multi ports and single socket"""
+ """Punt socket traffic multi ports and single socket"""
pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
punt_l4 = mk_vpp_cfg4()
@@ -396,11 +383,12 @@ class TestIP4PuntSocket(TestPuntSocket):
pkts = []
for port in self.ports:
# choose port from port list
- pkt = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
- UDP(sport=9876, dport=port) /
- Raw(b'\xa5' * 100))
+ pkt = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / UDP(sport=9876, dport=port)
+ / Raw(b"\xa5" * 100)
+ )
pkts += pkt * self.nr_packets
#
@@ -408,8 +396,9 @@ class TestIP4PuntSocket(TestPuntSocket):
#
self.socket_client_create("%s/socket_multi" % self.tempdir)
for p in self.ports:
- self.vapi.punt_socket_register(set_port(punt_l4, p),
- "%s/socket_multi" % self.tempdir)
+ self.vapi.punt_socket_register(
+ set_port(punt_l4, p), "%s/socket_multi" % self.tempdir
+ )
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), len(self.ports))
@@ -428,7 +417,7 @@ class TestIP4PuntSocket(TestPuntSocket):
class TestIP6PuntSocket(TestPuntSocket):
- """ Punt Socket for IPv6 UDP """
+ """Punt Socket for IPv6 UDP"""
@classmethod
def setUpClass(cls):
@@ -452,7 +441,7 @@ class TestIP6PuntSocket(TestPuntSocket):
i.admin_down()
def test_punt_socket_dump(self):
- """ Punt socket registration """
+ """Punt socket registration"""
pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
@@ -460,15 +449,7 @@ class TestIP6PuntSocket(TestPuntSocket):
#
# configure a punt socket
#
- punt_l4 = {
- 'type': pt_l4,
- 'punt': {
- 'l4': {
- 'af': af_ip6,
- 'protocol': udp_proto
- }
- }
- }
+ punt_l4 = {"type": pt_l4, "punt": {"l4": {"af": af_ip6, "protocol": udp_proto}}}
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), 0)
@@ -476,10 +457,12 @@ class TestIP6PuntSocket(TestPuntSocket):
#
# configure a punt socket
#
- self.vapi.punt_socket_register(set_port(punt_l4, 1111),
- "%s/socket_1111" % self.tempdir)
- self.vapi.punt_socket_register(set_port(punt_l4, 2222),
- "%s/socket_2222" % self.tempdir)
+ self.vapi.punt_socket_register(
+ set_port(punt_l4, 1111), "%s/socket_1111" % self.tempdir
+ )
+ self.vapi.punt_socket_register(
+ set_port(punt_l4, 2222), "%s/socket_2222" % self.tempdir
+ )
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), 2)
self.verify_port(set_port(punt_l4, 1111), punts[0])
@@ -495,8 +478,9 @@ class TestIP6PuntSocket(TestPuntSocket):
#
# configure a punt socket again
#
- self.vapi.punt_socket_register(set_port(punt_l4, 1111),
- "%s/socket_1111" % self.tempdir)
+ self.vapi.punt_socket_register(
+ set_port(punt_l4, 1111), "%s/socket_1111" % self.tempdir
+ )
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), 2)
@@ -510,28 +494,29 @@ class TestIP6PuntSocket(TestPuntSocket):
self.assertEqual(len(punts), 0)
def test_punt_socket_traffic_single_port_single_socket(self):
- """ Punt socket traffic single port single socket"""
+ """Punt socket traffic single port single socket"""
port = self.ports[0]
pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
punt_l4 = {
- 'type': pt_l4,
- 'punt': {
- 'l4': {
- 'af': af_ip6,
- 'protocol': udp_proto,
- 'port': port,
+ "type": pt_l4,
+ "punt": {
+ "l4": {
+ "af": af_ip6,
+ "protocol": udp_proto,
+ "port": port,
}
- }
+ },
}
- p = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- inet6.UDP(sport=9876, dport=port) /
- Raw(b'\xa5' * 100))
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / inet6.UDP(sport=9876, dport=port)
+ / Raw(b"\xa5" * 100)
+ )
pkts = p * self.nr_packets
@@ -555,8 +540,7 @@ class TestIP6PuntSocket(TestPuntSocket):
# configure a punt socket
#
self.socket_client_create("%s/socket_%d" % (self.tempdir, port))
- self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" %
- (self.tempdir, port))
+ self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" % (self.tempdir, port))
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), 1)
@@ -586,7 +570,7 @@ class TestIP6PuntSocket(TestPuntSocket):
# self.pg0.get_capture(nr_packets)
def test_punt_socket_traffic_multi_ports_multi_sockets(self):
- """ Punt socket traffic multi ports and multi sockets"""
+ """Punt socket traffic multi ports and multi sockets"""
punt_l4 = mk_vpp_cfg6()
@@ -600,50 +584,52 @@ class TestIP6PuntSocket(TestPuntSocket):
# choose port from port list
cfgs[port] = {}
- pkt = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- UDP(sport=9876, dport=port) /
- Raw(b'\xa5' * 100))
- cfgs[port]['pkts'] = pkt * self.nr_packets
- cfgs[port]['port'] = port
- cfgs[port]['vpp'] = copy.deepcopy(set_port(punt_l4, port))
+ pkt = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / UDP(sport=9876, dport=port)
+ / Raw(b"\xa5" * 100)
+ )
+ cfgs[port]["pkts"] = pkt * self.nr_packets
+ cfgs[port]["port"] = port
+ cfgs[port]["vpp"] = copy.deepcopy(set_port(punt_l4, port))
# configure punt sockets
- cfgs[port]['sock'] = self.socket_client_create(
- "%s/socket_%d" % (self.tempdir, port))
+ cfgs[port]["sock"] = self.socket_client_create(
+ "%s/socket_%d" % (self.tempdir, port)
+ )
self.vapi.punt_socket_register(
- cfgs[port]['vpp'],
- "%s/socket_%d" % (self.tempdir, port))
+ cfgs[port]["vpp"], "%s/socket_%d" % (self.tempdir, port)
+ )
#
# send the packets that get punted
#
for cfg in cfgs.values():
- self.send_and_assert_no_replies(self.pg0, cfg['pkts'])
+ self.send_and_assert_no_replies(self.pg0, cfg["pkts"])
#
# test that we got the excepted packets on the expected socket
#
for cfg in cfgs.values():
- rx = cfg['sock'].close()
- self.verify_udp_pkts(rx, len(cfg['pkts']), cfg['port'])
- self.vapi.punt_socket_deregister(cfg['vpp'])
+ rx = cfg["sock"].close()
+ self.verify_udp_pkts(rx, len(cfg["pkts"]), cfg["port"])
+ self.vapi.punt_socket_deregister(cfg["vpp"])
def test_punt_socket_traffic_multi_ports_single_socket(self):
- """ Punt socket traffic multi ports and single socket"""
+ """Punt socket traffic multi ports and single socket"""
pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
punt_l4 = {
- 'type': pt_l4,
- 'punt': {
- 'l4': {
- 'af': af_ip6,
- 'protocol': udp_proto,
+ "type": pt_l4,
+ "punt": {
+ "l4": {
+ "af": af_ip6,
+ "protocol": udp_proto,
}
- }
+ },
}
#
@@ -652,11 +638,12 @@ class TestIP6PuntSocket(TestPuntSocket):
pkts = []
for port in self.ports:
# choose port from port list
- pkt = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- UDP(sport=9876, dport=port) /
- Raw(b'\xa5' * 100))
+ pkt = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / UDP(sport=9876, dport=port)
+ / Raw(b"\xa5" * 100)
+ )
pkts += pkt * self.nr_packets
#
@@ -670,8 +657,9 @@ class TestIP6PuntSocket(TestPuntSocket):
#
self.socket_client_create("%s/socket_multi" % self.tempdir)
for p in self.ports:
- self.vapi.punt_socket_register(set_port(punt_l4, p),
- "%s/socket_multi" % self.tempdir)
+ self.vapi.punt_socket_register(
+ set_port(punt_l4, p), "%s/socket_multi" % self.tempdir
+ )
punts = self.vapi.punt_socket_dump(type=pt_l4)
self.assertEqual(len(punts), len(self.ports))
@@ -696,7 +684,7 @@ class TestIP6PuntSocket(TestPuntSocket):
class TestExceptionPuntSocket(TestPuntSocket):
- """ Punt Socket for Exceptions """
+ """Punt Socket for Exceptions"""
@classmethod
def setUpClass(cls):
@@ -721,7 +709,7 @@ class TestExceptionPuntSocket(TestPuntSocket):
i.admin_down()
def test_registration(self):
- """ Punt socket registration/deregistration"""
+ """Punt socket registration/deregistration"""
pt_ex = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_EXCEPTION
@@ -731,17 +719,14 @@ class TestExceptionPuntSocket(TestPuntSocket):
#
# configure a punt socket
#
- punt_ex = {
- 'type': pt_ex,
- 'punt': {
- 'exception': {}
- }
- }
+ punt_ex = {"type": pt_ex, "punt": {"exception": {}}}
- self.vapi.punt_socket_register(set_reason(punt_ex, 1),
- "%s/socket_punt_1" % self.tempdir)
- self.vapi.punt_socket_register(set_reason(punt_ex, 2),
- "%s/socket_punt_2" % self.tempdir)
+ self.vapi.punt_socket_register(
+ set_reason(punt_ex, 1), "%s/socket_punt_1" % self.tempdir
+ )
+ self.vapi.punt_socket_register(
+ set_reason(punt_ex, 2), "%s/socket_punt_2" % self.tempdir
+ )
punts = self.vapi.punt_socket_dump(type=pt_ex)
self.assertEqual(len(punts), 2)
self.verify_exception(set_reason(punt_ex, 1), punts[0])
@@ -757,10 +742,12 @@ class TestExceptionPuntSocket(TestPuntSocket):
#
# configure a punt socket again
#
- self.vapi.punt_socket_register(set_reason(punt_ex, 1),
- "%s/socket_punt_1" % self.tempdir)
- self.vapi.punt_socket_register(set_reason(punt_ex, 3),
- "%s/socket_punt_3" % self.tempdir)
+ self.vapi.punt_socket_register(
+ set_reason(punt_ex, 1), "%s/socket_punt_1" % self.tempdir
+ )
+ self.vapi.punt_socket_register(
+ set_reason(punt_ex, 3), "%s/socket_punt_3" % self.tempdir
+ )
punts = self.vapi.punt_socket_dump(type=pt_ex)
self.assertEqual(len(punts), 3)
@@ -785,25 +772,18 @@ class TestExceptionPuntSocket(TestPuntSocket):
self.assertTrue(rx.haslayer(UDP))
def test_traffic(self):
- """ Punt socket traffic """
+ """Punt socket traffic"""
port = self.ports[0]
pt_ex = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_EXCEPTION
- punt_ex = {
- 'type': pt_ex,
- 'punt': {
- 'exception': {}
- }
- }
+ punt_ex = {"type": pt_ex, "punt": {"exception": {}}}
#
# we're dealing with IPSec tunnels punting for no-such-tunnel
# (SPI=0 goes to ikev2)
#
cfgs = dict()
- cfgs['ipsec4-no-such-tunnel'] = {'spi': 99,
- 'udp': False,
- 'itf': self.pg0}
+ cfgs["ipsec4-no-such-tunnel"] = {"spi": 99, "udp": False, "itf": self.pg0}
#
# find the VPP ID for these punt exception reasin
@@ -814,99 +794,99 @@ class TestExceptionPuntSocket(TestPuntSocket):
print(r.reason.name)
print(key)
if r.reason.name == key:
- cfgs[key]['id'] = r.reason.id
- cfgs[key]['vpp'] = copy.deepcopy(
- set_reason(punt_ex,
- cfgs[key]['id']))
+ cfgs[key]["id"] = r.reason.id
+ cfgs[key]["vpp"] = copy.deepcopy(
+ set_reason(punt_ex, cfgs[key]["id"])
+ )
break
#
# configure punt sockets
#
for cfg in cfgs.values():
- cfg['sock'] = self.socket_client_create("%s/socket_%d" %
- (self.tempdir, cfg['id']))
+ cfg["sock"] = self.socket_client_create(
+ "%s/socket_%d" % (self.tempdir, cfg["id"])
+ )
self.vapi.punt_socket_register(
- cfg['vpp'], "%s/socket_%d" % (self.tempdir, cfg['id']))
+ cfg["vpp"], "%s/socket_%d" % (self.tempdir, cfg["id"])
+ )
#
# create packet streams for 'no-such-tunnel' exception
#
for cfg in cfgs.values():
- pkt = (Ether(src=cfg['itf'].remote_mac,
- dst=cfg['itf'].local_mac) /
- IP(src=cfg['itf'].remote_ip4,
- dst=cfg['itf'].local_ip4))
- if (cfg['udp']):
+ pkt = Ether(src=cfg["itf"].remote_mac, dst=cfg["itf"].local_mac) / IP(
+ src=cfg["itf"].remote_ip4, dst=cfg["itf"].local_ip4
+ )
+ if cfg["udp"]:
pkt = pkt / UDP(sport=666, dport=4500)
- pkt = (pkt / ESP(spi=cfg['spi'], seq=3) /
- Raw(b'\xa5' * 100))
- cfg['pkts'] = [pkt]
+ pkt = pkt / ESP(spi=cfg["spi"], seq=3) / Raw(b"\xa5" * 100)
+ cfg["pkts"] = [pkt]
#
# send packets for each SPI we expect to be punted
#
for cfg in cfgs.values():
- self.send_and_assert_no_replies(cfg['itf'], cfg['pkts'])
+ self.send_and_assert_no_replies(cfg["itf"], cfg["pkts"])
#
# verify the punted packets arrived on the associated socket
#
for cfg in cfgs.values():
- rx = cfg['sock'].close()
- self.verify_esp_pkts(rx, len(cfg['pkts']),
- cfg['spi'], cfg['udp'])
+ rx = cfg["sock"].close()
+ self.verify_esp_pkts(rx, len(cfg["pkts"]), cfg["spi"], cfg["udp"])
#
# add some tunnels, make sure it still punts
#
tun = VppIpsecInterface(self).add_vpp_config()
- sa_in = VppIpsecSA(self, 11, 11,
- (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA1_96),
- b"0123456701234567",
- (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_CBC_128),
- b"0123456701234567",
- 50,
- self.pg0.local_ip4,
- self.pg0.remote_ip4).add_vpp_config()
- sa_out = VppIpsecSA(self, 22, 22,
- (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA1_96),
- b"0123456701234567",
- (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_CBC_128),
- b"0123456701234567",
- 50,
- self.pg0.local_ip4,
- self.pg0.remote_ip4).add_vpp_config()
- protect = VppIpsecTunProtect(self, tun,
- sa_out,
- [sa_in]).add_vpp_config()
+ sa_in = VppIpsecSA(
+ self,
+ 11,
+ 11,
+ (VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96),
+ b"0123456701234567",
+ (VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128),
+ b"0123456701234567",
+ 50,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4,
+ ).add_vpp_config()
+ sa_out = VppIpsecSA(
+ self,
+ 22,
+ 22,
+ (VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96),
+ b"0123456701234567",
+ (VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128),
+ b"0123456701234567",
+ 50,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4,
+ ).add_vpp_config()
+ protect = VppIpsecTunProtect(self, tun, sa_out, [sa_in]).add_vpp_config()
#
# send packets for each SPI we expect to be punted
#
for cfg in cfgs.values():
- self.send_and_assert_no_replies(cfg['itf'], cfg['pkts'])
+ self.send_and_assert_no_replies(cfg["itf"], cfg["pkts"])
#
# verify the punted packets arrived on the associated socket
#
for cfg in cfgs.values():
- rx = cfg['sock'].close()
- self.verify_esp_pkts(rx, len(cfg['pkts']),
- cfg['spi'], cfg['udp'])
+ rx = cfg["sock"].close()
+ self.verify_esp_pkts(rx, len(cfg["pkts"]), cfg["spi"], cfg["udp"])
#
# socket deregister
#
for cfg in cfgs.values():
- self.vapi.punt_socket_deregister(cfg['vpp'])
+ self.vapi.punt_socket_deregister(cfg["vpp"])
class TestIpProtoPuntSocket(TestPuntSocket):
- """ Punt Socket for IP packets """
+ """Punt Socket for IP packets"""
@classmethod
def setUpClass(cls):
@@ -930,7 +910,7 @@ class TestIpProtoPuntSocket(TestPuntSocket):
i.admin_down()
def test_registration(self):
- """ Punt socket registration/deregistration"""
+ """Punt socket registration/deregistration"""
af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
@@ -944,28 +924,16 @@ class TestIpProtoPuntSocket(TestPuntSocket):
# configure a punt socket
#
punt_ospf = {
- 'type': pt_ip,
- 'punt': {
- 'ip_proto': {
- 'af': af_ip4,
- 'protocol': proto_ospf
- }
- }
+ "type": pt_ip,
+ "punt": {"ip_proto": {"af": af_ip4, "protocol": proto_ospf}},
}
punt_eigrp = {
- 'type': pt_ip,
- 'punt': {
- 'ip_proto': {
- 'af': af_ip4,
- 'protocol': proto_eigrp
- }
- }
+ "type": pt_ip,
+ "punt": {"ip_proto": {"af": af_ip4, "protocol": proto_eigrp}},
}
- self.vapi.punt_socket_register(punt_ospf,
- "%s/socket_punt_1" % self.tempdir)
- self.vapi.punt_socket_register(punt_eigrp,
- "%s/socket_punt_2" % self.tempdir)
+ self.vapi.punt_socket_register(punt_ospf, "%s/socket_punt_1" % self.tempdir)
+ self.vapi.punt_socket_register(punt_eigrp, "%s/socket_punt_2" % self.tempdir)
self.logger.info(self.vapi.cli("sh punt sock reg ip"))
punts = self.vapi.punt_socket_dump(type=pt_ip)
self.assertEqual(len(punts), 2)
@@ -982,8 +950,7 @@ class TestIpProtoPuntSocket(TestPuntSocket):
#
# configure a punt socket again
#
- self.vapi.punt_socket_register(punt_ospf,
- "%s/socket_punt_3" % self.tempdir)
+ self.vapi.punt_socket_register(punt_ospf, "%s/socket_punt_3" % self.tempdir)
punts = self.vapi.punt_socket_dump(type=pt_ip)
self.assertEqual(len(punts), 2)
@@ -1003,7 +970,7 @@ class TestIpProtoPuntSocket(TestPuntSocket):
self.assertTrue(rx.haslayer(OSPF_Hdr))
def test_traffic(self):
- """ Punt socket traffic """
+ """Punt socket traffic"""
af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
@@ -1013,28 +980,23 @@ class TestIpProtoPuntSocket(TestPuntSocket):
# configure a punt socket to capture OSPF packets
#
punt_ospf = {
- 'type': pt_ip,
- 'punt': {
- 'ip_proto': {
- 'af': af_ip4,
- 'protocol': proto_ospf
- }
- }
+ "type": pt_ip,
+ "punt": {"ip_proto": {"af": af_ip4, "protocol": proto_ospf}},
}
#
# create packet streams and configure a punt sockets
#
- pkt = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
- OSPF_Hdr() /
- OSPFv3_Hello())
+ pkt = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / OSPF_Hdr()
+ / OSPFv3_Hello()
+ )
pkts = pkt * 7
sock = self.socket_client_create("%s/socket_1" % self.tempdir)
- self.vapi.punt_socket_register(
- punt_ospf, "%s/socket_1" % self.tempdir)
+ self.vapi.punt_socket_register(punt_ospf, "%s/socket_1" % self.tempdir)
#
# send packets for each SPI we expect to be punted
@@ -1049,9 +1011,89 @@ class TestIpProtoPuntSocket(TestPuntSocket):
self.vapi.punt_socket_deregister(punt_ospf)
+class TestDot1QPuntSocket(TestPuntSocket):
+ """Punt Socket for 802.1Q (dot1q)"""
+
+ def setUp(self):
+ super(TestDot1QPuntSocket, self).setUp()
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ super(TestDot1QPuntSocket, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def test_dot1q_header_punt(self):
+ """Punt socket traffic with Dot1q header"""
+
+ port = self.ports[0]
+ pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
+ punt_l4 = set_port(mk_vpp_cfg4(), port)
+
+ # VLAN ID
+ vlan_id = 100
+
+ # Create a subinterface with the VLAN ID
+ subif = VppDot1QSubint(self, self.pg0, vlan_id)
+ subif.admin_up()
+ subif.config_ip4()
+
+ # Configure an IP address on the subinterface
+ subif_ip4 = subif.local_ip4
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / Dot1Q(vlan=vlan_id)
+ / IP(src=self.pg0.remote_ip4, dst=subif_ip4)
+ / UDP(sport=9876, dport=port)
+ / Raw(b"\xa5" * 100)
+ )
+
+ pkts = p * self.nr_packets
+
+ # Expect ICMP - port unreachable for all packets
+ rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
+
+ for p in rx:
+ self.assertEqual(int(p[IP].proto), 1) # ICMP
+ self.assertEqual(int(p[ICMP].code), 3) # unreachable
+
+ # Configure a punt socket
+ self.socket_client_create("%s/socket_%d" % (self.tempdir, port))
+ self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" % (self.tempdir, port))
+ punts = self.vapi.punt_socket_dump(type=pt_l4)
+ self.assertEqual(len(punts), 1)
+
+ # Expect punt socket and no packets on pg0
+ self.send_and_assert_no_replies(self.pg0, pkts)
+ rx = self.socket_client_close()
+ self.logger.info("RXPKT")
+ self.logger.info(rx)
+ self.verify_udp_pkts(rx, len(pkts), port)
+ for pkt in rx:
+ self.assertEqual(pkt[Ether].src, self.pg0.remote_mac)
+ self.assertEqual(pkt[Ether].dst, self.pg0.local_mac)
+ self.assertEqual(pkt[Dot1Q].vlan, 100)
+
+ # Remove punt socket. Expect ICMP - port unreachable for all packets
+ self.vapi.punt_socket_deregister(punt_l4)
+ punts = self.vapi.punt_socket_dump(type=pt_l4)
+ self.assertEqual(len(punts), 0)
+
+ rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
+ for p in rx:
+ self.assertEqual(int(p[IP].proto), 1) # ICMP
+ self.assertEqual(int(p[ICMP].code), 3) # unreachable
+
+
@tag_fixme_vpp_workers
class TestPunt(VppTestCase):
- """ Exception Punt Test Case """
+ """Exception Punt Test Case"""
@classmethod
def setUpClass(cls):
@@ -1081,7 +1123,7 @@ class TestPunt(VppTestCase):
super(TestPunt, self).tearDown()
def test_punt(self):
- """ Exception Path testing """
+ """Exception Path testing"""
#
# dump the punt registered reasons
@@ -1089,9 +1131,11 @@ class TestPunt(VppTestCase):
#
rs = self.vapi.punt_reason_dump()
- reasons = ["ipsec6-no-such-tunnel",
- "ipsec4-no-such-tunnel",
- "ipsec4-spi-o-udp-0"]
+ reasons = [
+ "ipsec6-no-such-tunnel",
+ "ipsec4-no-such-tunnel",
+ "ipsec4-spi-o-udp-0",
+ ]
for reason in reasons:
found = False
@@ -1106,28 +1150,41 @@ class TestPunt(VppTestCase):
# send ACL deny packets out of pg0 and pg1.
# the ACL is src,dst = 1.1.1.1,1.1.1.2
#
- ip_1_1_1_2 = VppIpRoute(self, "1.1.1.2", 32,
- [VppRoutePath(self.pg3.remote_ip4,
- self.pg3.sw_if_index)])
+ ip_1_1_1_2 = VppIpRoute(
+ self,
+ "1.1.1.2",
+ 32,
+ [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
+ )
ip_1_1_1_2.add_vpp_config()
- ip_1_2 = VppIpRoute(self, "1::2", 128,
- [VppRoutePath(self.pg3.remote_ip6,
- self.pg3.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)])
+ ip_1_2 = VppIpRoute(
+ self,
+ "1::2",
+ 128,
+ [
+ VppRoutePath(
+ self.pg3.remote_ip6,
+ self.pg3.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6,
+ )
+ ],
+ )
ip_1_2.add_vpp_config()
- p4 = (Ether(src=self.pg2.remote_mac,
- dst=self.pg2.local_mac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100))
- p6 = (Ether(src=self.pg2.remote_mac,
- dst=self.pg2.local_mac) /
- IPv6(src="1::1", dst="1::2") /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100))
- self.send_and_expect(self.pg2, p4*1, self.pg3)
- self.send_and_expect(self.pg2, p6*1, self.pg3)
+ p4 = (
+ Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ p6 = (
+ Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
+ / IPv6(src="1::1", dst="1::2")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ self.send_and_expect(self.pg2, p4 * 1, self.pg3)
+ self.send_and_expect(self.pg2, p6 * 1, self.pg3)
#
# apply the punting features
@@ -1137,16 +1194,16 @@ class TestPunt(VppTestCase):
#
# dump the punt reasons to learn the IDs assigned
#
- rs = self.vapi.punt_reason_dump(reason={'name': "reason-v4"})
+ rs = self.vapi.punt_reason_dump(reason={"name": "reason-v4"})
r4 = rs[0].reason.id
- rs = self.vapi.punt_reason_dump(reason={'name': "reason-v6"})
+ rs = self.vapi.punt_reason_dump(reason={"name": "reason-v6"})
r6 = rs[0].reason.id
#
# pkts now dropped
#
- self.send_and_assert_no_replies(self.pg2, p4*NUM_PKTS)
- self.send_and_assert_no_replies(self.pg2, p6*NUM_PKTS)
+ self.send_and_assert_no_replies(self.pg2, p4 * NUM_PKTS)
+ self.send_and_assert_no_replies(self.pg2, p6 * NUM_PKTS)
#
# Check state:
@@ -1154,13 +1211,12 @@ class TestPunt(VppTestCase):
# 2 - per-reason counters
# 2, 3 are the index of the assigned punt reason
#
- stats = self.statistics.get_err_counter(
- "/err/punt-dispatch/No registrations")
- self.assertEqual(stats, 2*NUM_PKTS)
+ stats = self.statistics.get_err_counter("/err/punt-dispatch/No registrations")
+ self.assertEqual(stats, 2 * NUM_PKTS)
stats = self.statistics.get_counter("/net/punt")
- self.assertEqual(stats[0][r4]['packets'], NUM_PKTS)
- self.assertEqual(stats[0][r6]['packets'], NUM_PKTS)
+ self.assertEqual(stats[0][r4]["packets"], NUM_PKTS)
+ self.assertEqual(stats[0][r6]["packets"], NUM_PKTS)
#
# use the test CLI to test a client that punts exception
@@ -1169,8 +1225,8 @@ class TestPunt(VppTestCase):
self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip4)
self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip6)
- rx4s = self.send_and_expect(self.pg2, p4*NUM_PKTS, self.pg0)
- rx6s = self.send_and_expect(self.pg2, p6*NUM_PKTS, self.pg0)
+ rx4s = self.send_and_expect(self.pg2, p4 * NUM_PKTS, self.pg0)
+ rx6s = self.send_and_expect(self.pg2, p6 * NUM_PKTS, self.pg0)
#
# check the packets come out IP unmodified but destined to pg0 host
@@ -1187,8 +1243,8 @@ class TestPunt(VppTestCase):
self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
stats = self.statistics.get_counter("/net/punt")
- self.assertEqual(stats[0][r4]['packets'], 2*NUM_PKTS)
- self.assertEqual(stats[0][r6]['packets'], 2*NUM_PKTS)
+ self.assertEqual(stats[0][r4]["packets"], 2 * NUM_PKTS)
+ self.assertEqual(stats[0][r6]["packets"], 2 * NUM_PKTS)
#
# add another registration for the same reason to send packets
@@ -1234,8 +1290,8 @@ class TestPunt(VppTestCase):
self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
stats = self.statistics.get_counter("/net/punt")
- self.assertEqual(stats[0][r4]['packets'], 3*NUM_PKTS)
- self.assertEqual(stats[0][r6]['packets'], 3*NUM_PKTS)
+ self.assertEqual(stats[0][r4]["packets"], 3 * NUM_PKTS)
+ self.assertEqual(stats[0][r6]["packets"], 3 * NUM_PKTS)
self.logger.info(self.vapi.cli("show vlib graph punt-dispatch"))
self.logger.info(self.vapi.cli("show punt client"))
@@ -1244,5 +1300,5 @@ class TestPunt(VppTestCase):
self.logger.info(self.vapi.cli("show punt db"))
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)