""" test framework utilities """ import abc import socket from socket import AF_INET6 import six import sys import os.path import scapy.compat from scapy.layers.l2 import Ether from scapy.layers.inet import IP from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\ IPv6ExtHdrHopByHop from scapy.packet import Raw from scapy.utils import hexdump from scapy.utils6 import in6_mactoifaceid from io import BytesIO from vpp_papi import mac_pton def ppp(headline, packet): """ Return string containing the output of scapy packet.show() call. """ return '%s\n%s\n\n%s\n' % (headline, hexdump(packet, dump=True), packet.show(dump=True)) def ppc(headline, capture, limit=10): """ Return string containing ppp() printout for a capture. :param headline: printed as first line of output :param capture: packets to print :param limit: limit the print to # of packets """ if not capture: return headline tail = "" if limit < len(capture): tail = "\nPrint limit reached, %s out of %s packets printed" % ( limit, len(capture)) body = "".join([ppp("Packet #%s:" % count, p) for count, p in zip(range(0, limit), capture)]) return "%s\n%s%s" % (headline, body, tail) def ip4_range(ip4, s, e): tmp = ip4.rsplit('.', 1)[0] return ("%s.%d" % (tmp, i) for i in range(s, e)) def ip4n_range(ip4n, s, e): ip4 = socket.inet_ntop(socket.AF_INET, ip4n) return (socket.inet_pton(socket.AF_INET, ip) for ip in ip4_range(ip4, s, e)) # wrapper around scapy library function. def mk_ll_addr(mac): euid = in6_mactoifaceid(mac) addr = "fe80::" + euid return addr def ip6_normalize(ip6): return socket.inet_ntop(socket.AF_INET6, socket.inet_pton(socket.AF_INET6, ip6)) def get_core_path(tempdir): return "%s/%s" % (tempdir, get_core_pattern()) def is_core_present(tempdir): return os.path.isfile(get_core_path(tempdir)) def get_core_pattern(): with open("/proc/sys/kernel/core_pattern", "r") as f: corefmt = f.read().strip() return corefmt def check_core_path(logger, core_path): corefmt = get_core_pattern() if corefmt.startswith("|"): logger.error( "WARNING: redirecting the core dump through a" " filter may result in truncated dumps.") logger.error( " You may want to check the filter settings" " or uninstall it and edit the" " /proc/sys/kernel/core_pattern accordingly.") logger.error( " current core pattern is: %s" % corefmt) class NumericConstant(object): desc_dict = {} def __init__(self, value): self._value = value def __int__(self): return self._value def __long__(self): return self._value def __str__(self): if self._value in self.desc_dict: return self.desc_dict[self._value] return "" class Host(object): """ Generic test host "connected" to VPPs interface. """ @property def mac(self): """ MAC address """ return self._mac @property def bin_mac(self): """ MAC address """ return mac_pton(self._mac) @property def ip4(self): """ IPv4 address - string """ return self._ip4 @property def ip4n(self): """ IPv4 address of remote host - raw, suitable as API parameter.""" return socket.inet_pton(socket.AF_INET, self._ip4) @property def ip6(self): """ IPv6 address - string """ return self._ip6 @property def ip6n(self): """ IPv6 address of remote host - raw, suitable as API parameter.""" return socket.inet_pton(socket.AF_INET6, self._ip6) @property def ip6_ll(self): """ IPv6 link-local address - string """ return self._ip6_ll @property def ip6n_ll(self): """ IPv6 link-local address of remote host - raw, suitable as API parameter.""" return socket.inet_pton(socket.AF_INET6, self._ip6_ll) def __eq__(self, other): if isinstance(other, Host): return (self.mac == other.mac and self.ip4 == other.ip4 and self.ip6 == other.ip6 and self.ip6_ll == other.ip6_ll) else: return False def __ne__(self, other): return not self.__eq__(other) def __repr__(self): return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac, self.ip4, self.ip6, self.ip6_ll) def __hash__(self): return hash(self.__repr__()) def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None): self._mac = mac self._ip4 = ip4 self._ip6 = ip6 self._ip6_ll = ip6_ll class ForeignAddressFactory(object): count = 0 prefix_len = 24 net_template = '10.10.10.{}' net = net_template.format(0) + '/' + str(prefix_len) def get_ip4(self): if self