import unittest import socket import struct from scapy.layers.inet import IP, ICMP, TCP, UDP from scapy.layers.ipsec import SecurityAssociation, ESP from scapy.layers.l2 import Ether from scapy.packet import raw, Raw from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \ IPv6ExtHdrFragment, IPv6ExtHdrDestOpt from framework import VppTestCase, VppTestRunner from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200 from vpp_papi import VppEnum class IPsecIPv4Params: addr_type = socket.AF_INET addr_any = "0.0.0.0" addr_bcast = "255.255.255.255" addr_len = 32 is_ipv6 = 0 def __init__(self): self.remote_tun_if_host = '1.1.1.1' self.remote_tun_if_host6 = '1111::1' self.scapy_tun_sa_id = 100 self.scapy_tun_spi = 1001 self.vpp_tun_sa_id = 200 self.vpp_tun_spi = 1000 self.scapy_tra_sa_id = 300 self.scapy_tra_spi = 2001 self.vpp_tra_sa_id = 400 self.vpp_tra_spi = 2000 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t. IPSEC_API_INTEG_ALG_SHA1_96) self.auth_algo = 'HMAC-SHA1-96' # scapy name self.auth_key = b'C91KUR9GYMm5GfkEvNjX' self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name self.crypt_key = b'JPjyOWBeVEQiMe7h' self.salt = 0 self.flags = 0 self.nat_header = None self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t. TUNNEL_API_ENCAP_DECAP_FLAG_NONE) self.dscp = 0 class IPsecIPv6Params: addr_type = socket.AF_INET6 addr_any = "0::0" addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff" addr_len = 128 is_ipv6 = 1 def __init__(self): self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111' self.remote_tun_if_host4 = '1.1.1.1' self.scapy_tun_sa_id = 500 self.scapy_tun_spi = 3001 self.vpp_tun_sa_id = 600 self.vpp_tun_spi = 3000 self.scapy_tra_sa_id = 700 self.scapy_tra_spi = 4001 self.vpp_tra_sa_id = 800 self.vpp_tra_spi = 4000 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t. IPSEC_API_INTEG_ALG_SHA1_96) self.auth_algo = 'HMAC-SHA1-96' # scapy name self.auth_key = b'C91KUR9GYMm5GfkEvNjX' self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name self.crypt_key = b'JPjyOWBeVEQiMe7h' self.salt = 0 self.flags = 0 self.nat_header = None self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t. TUNNEL_API_ENCAP_DECAP_FLAG_NONE) self.dscp = 0 def mk_scapy_crypt_key(p): if p.crypt_algo == "AES-GCM": return p.crypt_key + struct.pack("!I", p.salt) else: return p.crypt_key def config_tun_params(p, encryption_type, tun_if): ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6} esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. IPSEC_API_SAD_FLAG_USE_ESN)) p.tun_dst = tun_if.remote_addr[p.addr_type] p.tun_src = tun_if.local_addr[p.addr_type] crypt_key = mk_scapy_crypt_key(p) p.scapy_tun_sa = SecurityAssociation( encryption_type, spi=p.vpp_tun_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, tunnel_header=ip_class_by_addr_type[p.addr_type]( src=p.tun_dst, dst=p.tun_src), nat_t_header=p.nat_header, esn_en=esn_en) p.vpp_tun_sa = SecurityAssociation( encryption_type, spi=p.scapy_tun_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, tunnel_header=ip_class_by_addr_type[p.addr_type]( dst=p.tun_dst, src=p.tun_src), nat_t_header=p.nat_header, esn_en=esn_en) def config_tra_params(p, encryption_type): esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. IPSEC_API_SAD_FLAG_USE_ESN)) crypt_key = mk_scapy_crypt_key(p) p.scapy_tra_sa = SecurityAssociation( encryption_type, spi=p.vpp_tra_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, esn_en=esn_en) p.vpp_tra_sa = SecurityAssociation( encryption_type, spi=p.scapy_tra_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, esn_en=esn_en) class TemplateIpsec(VppTestCase): """ TRANSPORT MODE: ------ encrypt --- |tra_if| <-------> |VPP| ------ decrypt --- TUNNEL MODE: ------ encrypt --- plain --- |tun_if| <------- |VPP| <------ |pg1| ------ --- --- ------ decrypt --- plain --- |tun_if| -------> |VPP| ------> |pg1| ------ --- --- """ tun_spd_id = 1 tra_spd_id = 2 def ipsec_select_backend(self): """ empty method to be overloaded when necessary """ pass @classmethod def setUpClass(cls): super(TemplateIpsec, cls).setUpClass() @classmethod def tearDownClass(cls): super(TemplateIpsec, cls).tearDownClass() def setup_params(self): if not hasattr(self, 'ipv4_params'): self.ipv4_params = IPsecIPv4Params() if not hasattr(self, 'ipv6_params'): self.ipv6_params = IPsecIPv6Params() self.params = {self.ipv4_params.addr_type: self.ipv4_params, self.ipv6_params.addr_type: self.ipv6_params} def config_interfaces(self): self.create_pg_interfaces(range(3)) self.interfaces = list(self.pg_interfaces) for i in self.interfaces: i.admin_up() i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() def setUp(self): super(TemplateIpsec, self).setUp() self.setup_params() self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t. IPSEC_API_PROTO_ESP) self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t. IPSEC_API_PROTO_AH) self.config_interfaces() self.ipsec_select_backend() def unconfig_interfaces(self): for i in self.interfaces: i.admin_down() i.unconfig_ip4() i.unconfig_ip6() def tearDown(self): super(TemplateIpsec, self).tearDown() self.unconfig_interfaces() def show_commands_at_teardown(self): self.logger.info(self.vapi.cli("show hardware")) def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)) for i in range(count)] def gen_en
from vpp_object import VppObject
from vpp_ip import INVALID_INDEX
class PolicerAction():
""" sse2 qos action """
def __init__(self, type, dscp):
self.type = type
self.dscp = dscp
def encode(self):
return {'type': self.type, 'dscp': self.dscp}
class VppPolicer(VppObject):
""" Policer """
def __init__(self, test, name, cir, eir, commited_burst, excess_burst,
rate_type=0, round_type=0, type=0, color_aware=False,
conform_action=PolicerAction(1, 0),
exceed_action=PolicerAction(0, 0),
violate_action=PolicerAction(0, 0)):
self._test = test
self.name = name
self.cir = cir
self.eir = eir
self.commited_burst = commited_burst
self.excess_burst = excess_burst
self.rate_type = rate_type
self.round_type = round_type
self.type = type
self.color_aware = color_aware
self.conform_action = conform_action
self.exceed_action = exceed_action
self.violate_action = violate_action
self._policer_index = INVALID_INDEX
@property
def policer_index(self):
return self._policer_index
def add_vpp_config(self):
r = self._test.vapi.policer_add_del(
name=self.name, cir=self.cir,
eir=self.eir, cb=self.commited_burst, eb=self.excess_burst,
rate_type=self.rate_type, round_type=self.round_type,
type=self.type, color_aware=self.color_aware,
conform_action=self.conform_action.encode(),
exceed_action=self.exceed_action.encode(),
violate_action=self.violate_action.encode())
self._test.registry.register(self, self._test.logger)
self._policer_index = r.policer_index
return self
def remove_vpp_config(self):
self._test.vapi.policer_add_del(is_add=False, name=self.name)
self._policer_index = INVALID_INDEX
def bind_vpp_config(self, worker, bind):
self._test.vapi.policer_bind(name=self.name, worker_index=worker,
bind_enable=bind)
def apply_vpp_config(self, if_index, apply):
self._test.vapi.policer_input(name=self.name, sw_if_index=if_index,
apply=apply)
def query_vpp_config(self):
dump = self._test.vapi.policer_dump(
match_name_valid=True, match_name=self.name)
for policer in dump:
if policer.name == self.name:
return True
return False
def object_id(self):
return ("policer-%s" % (self.name))
def get_stats(self, worker=None):
conform = self._test.statistics.get_counter("/net/policer/conform")
exceed = self._test.statistics.get_counter("/net/policer/exceed")
violate = self._test.statistics.get_counter("/net/policer/violate")
counters = {"conform": conform, "exceed": exceed, "violate": violate}
total = {}
for name, c in counters.items():
total[f'{name}_packets'] = 0
total[f'{name}_bytes'] = 0
for i in range(len(c)):
t = c[i]
if worker is not None and i != worker + 1:
continue
stat_index = self._policer_index
total[f'{name}_packets'] += t[stat_index]['packets']
total[f'{name}_bytes'] += t[stat_index]['bytes']
return total