#!/usr/bin/env python from framework import VppTestCase, VppTestRunner from vpp_udp_encap import * from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 from scapy.contrib.mpls import MPLS from vpp_object import * from socket import inet_pton, inet_ntop, AF_INET, AF_INET6 def find_abf_policy(test, id): policies = test.vapi.abf_policy_dump() for p in policies: if id == p.policy.policy_id: return True return False def find_abf_itf_attach(test, id, sw_if_index): attachs = test.vapi.abf_itf_attach_dump() for a in attachs: if id == a.attach.policy_id and \ sw_if_index == a.attach.sw_if_index: return True return False class VppAbfPolicy(VppObject): def __init__(self, test, policy_id, acl, paths): self._test = test self.policy_id = policy_id self.acl = acl self.paths = paths def encode_paths(self): br_paths = [] for p in self.paths: lstack = [] for l in p.nh_labels: if type(l) == VppMplsLabel: lstack.append(l.encode()) else: lstack.append({'label': l, 'ttl': 255}) n_labels = len(lstack) while (len(lstack) < 16): lstack.append({}) br_paths.append({'next_hop': p.nh_addr, 'weight': 1, 'afi': p.proto, 'sw_if_index': 0xffffffff, 'preference': 0, 'table_id': p.nh_table_id, 'next_hop_id': p.next_hop_id, 'is_udp_encap': p.is_udp_encap, 'n_labels': n_labels, 'label_stack': lstack}) return br_paths def add_vpp_config(self): self._test.vapi.abf_policy_add_del( 1, {'policy_id': self.policy_id, 'acl_index': self.acl.acl_index, 'n_paths': len(self.paths), 'paths': self.encode_paths()}) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.abf_policy_add_del( 0, {'policy_id': self.policy_id, 'acl_index': self.acl.acl_index, 'n_paths': len(self.paths), 'paths': self.encode_paths()}) def query_vpp_config(self): return find_abf_policy(self._test, self.policy_id) def __str__(self): return self.object_id() def object_id(self): return ("abf-policy-%d" % self.policy_id) class VppAbfAttach(VppObject): def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0): self._test = test self.policy_id = policy_id self.sw_if_index = sw_if_index self.priority = priority self.is_ipv6 = is_ipv6 def add_vpp_config(self): self._test.vapi.abf_itf_attach_add_del( 1, {'policy_id': self.policy_id, 'sw_if_index': self.sw_if_index, 'priority': self.priority, 'is_ipv6': self.is_ipv6}) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.abf_itf_attach_add_del( 0, {'policy_id': self.policy_id, 'sw_if_index': self.sw_if_index, 'priority': self.priority, 'is_ipv6': self.is_ipv6}) def query_vpp_config(self): return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index) def __str__(self): return self.object_id() def object_id(self): return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)) class TestAbf(VppTestCase): """ ABF Test Case """ def setUp(self): super(TestAbf, self).setUp() self.create_pg_interfaces(range(4)) for i in self.pg_interfaces: i.admin_up() i.config_ip4()