From 79f5ba9bf7656972dd988508eff9465562dde42c Mon Sep 17 00:00:00 2001 From: Vratko Polak Date: Mon, 4 May 2020 13:05:26 +0200 Subject: Separate files needing GPL license + Keep apache license for now, until this is completed: https://wiki.fd.io/view/TSC/Relicensing_Procedure + Add utilities for switching license comment blocks. - They do not preserve attributes, so executable flag is lost. + Move the affected files to GPL/. + Update paths so files are executed from the new location. + Change the way scripts are started to do not require executable flag. + Employ OptionString when constructing longer command lines. + Move also PacketVerifier.py and TrafficScriptArg.py as they are linked with traffic scripts. + That means the two files are outside "resources" package tree now. + Added __init__.py files so relative imports work in new package tree. + Start traffic scripts as python modules to allow relative imports. + Once again needed because they are outside the default PYTHONPATH. Change-Id: Ieb135629e890adbaf5b79497570f3be25b746f9f Signed-off-by: Vratko Polak --- GPL/traffic_scripts/PacketVerifier.py | 344 ++++++++++++++++++++++++ GPL/traffic_scripts/TrafficScriptArg.py | 68 +++++ GPL/traffic_scripts/__init__.py | 16 ++ GPL/traffic_scripts/ipsec_interface.py | 271 +++++++++++++++++++ GPL/traffic_scripts/ipsec_policy.py | 231 ++++++++++++++++ GPL/traffic_scripts/lisp/__init__.py | 16 ++ GPL/traffic_scripts/lisp/lisp_check.py | 187 +++++++++++++ GPL/traffic_scripts/lisp/lispgpe_check.py | 210 +++++++++++++++ GPL/traffic_scripts/policer.py | 122 +++++++++ GPL/traffic_scripts/send_icmp_wait_for_reply.py | 136 ++++++++++ GPL/traffic_scripts/send_ip_check_headers.py | 164 +++++++++++ GPL/traffic_scripts/send_vxlan_check_vxlan.py | 119 ++++++++ GPL/traffic_scripts/srv6_encap.py | 306 +++++++++++++++++++++ GPL/traffic_scripts/vxlan.py | 32 +++ 14 files changed, 2222 insertions(+) create mode 100644 GPL/traffic_scripts/PacketVerifier.py create mode 100644 GPL/traffic_scripts/TrafficScriptArg.py create mode 100644 GPL/traffic_scripts/__init__.py create mode 100644 GPL/traffic_scripts/ipsec_interface.py create mode 100644 GPL/traffic_scripts/ipsec_policy.py create mode 100644 GPL/traffic_scripts/lisp/__init__.py create mode 100644 GPL/traffic_scripts/lisp/lisp_check.py create mode 100644 GPL/traffic_scripts/lisp/lispgpe_check.py create mode 100644 GPL/traffic_scripts/policer.py create mode 100644 GPL/traffic_scripts/send_icmp_wait_for_reply.py create mode 100644 GPL/traffic_scripts/send_ip_check_headers.py create mode 100644 GPL/traffic_scripts/send_vxlan_check_vxlan.py create mode 100644 GPL/traffic_scripts/srv6_encap.py create mode 100644 GPL/traffic_scripts/vxlan.py (limited to 'GPL/traffic_scripts') diff --git a/GPL/traffic_scripts/PacketVerifier.py b/GPL/traffic_scripts/PacketVerifier.py new file mode 100644 index 0000000000..20e9af603b --- /dev/null +++ b/GPL/traffic_scripts/PacketVerifier.py @@ -0,0 +1,344 @@ +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""PacketVerifier module. + + Example. :: + + | >>> from scapy.all import * + | >>> from PacketVerifier import * + | >>> rxq = RxQueue('eth1') + | >>> txq = TxQueue('eth1') + | >>> src_mac = "AA:BB:CC:DD:EE:FF" + | >>> dst_mac = "52:54:00:ca:5d:0b" + | >>> src_ip = "11.11.11.10" + | >>> dst_ip = "11.11.11.11" + | >>> sent_packets = [] + | >>> pkt_send = Ether(src=src_mac, dst=dst_mac) / + | ... IP(src=src_ip, dst=dst_ip) / + | ... ICMP() + | >>> sent_packets.append(pkt_send) + | >>> txq.send(pkt_send) + | >>> pkt_send = Ether(src=src_mac, dst=dst_mac) / + | ... ARP(hwsrc=src_mac, psrc=src_ip, hwdst=dst_mac, pdst=dst_ip, op=2) + | >>> sent_packets.append(pkt_send) + | >>> txq.send(pkt_send) + | >>> rxq.recv(100, sent_packets).show() + | ###[ Ethernet ]### + | dst = aa:bb:cc:dd:ee:ff + | src = 52:54:00:ca:5d:0b + | type = 0x800 + | ###[ IP ]### + | version = 4L + | ihl = 5L + | tos = 0x0 + | len = 28 + | id = 43183 + | flags = + | frag = 0L + | ttl = 64 + | proto = icmp + | chksum = 0xa607 + | src = 11.11.11.11 + | dst = 11.11.11.10 + | options + | ###[ ICMP ]### + | type = echo-reply + | code = 0 + | chksum = 0xffff + | id = 0x0 + | seq = 0x0 + | ###[ Padding ]### + | load = 'RT\x00\xca]\x0b\xaa\xbb\xcc\xdd\xee\xff\x08\x06\x00\x01\x08\x00' + + Example end. +""" + +import os +import select + +from scapy.all import ETH_P_IP, ETH_P_IPV6, ETH_P_ALL, ETH_P_ARP +from scapy.config import conf +from scapy.layers.inet6 import IPv6 +from scapy.layers.l2 import Ether, ARP +from scapy.packet import Raw + +# Enable libpcap's L2listen +conf.use_pcap = True + +__all__ = [ + u"RxQueue", u"TxQueue", u"Interface", u"create_gratuitous_arp_request", + u"auto_pad", u"checksum_equal" +] + +# TODO: http://stackoverflow.com/questions/320232/ +# ensuring-subprocesses-are-dead-on-exiting-python-program + + +class PacketVerifier: + """Base class for TX and RX queue objects for packet verifier.""" + def __init__(self, interface_name): + os.system( + f"sudo echo 1 > /proc/sys/net/ipv6/conf/{interface_name}/" + f"disable_ipv6" + ) + os.system(f"sudo ip link set {interface_name} up promisc on") + self._ifname = interface_name + + +def extract_one_packet(buf): + """Extract one packet from the incoming buf buffer. + + Takes string as input and looks for first whole packet in it. + If it finds one, it returns substring from the buf parameter. + + :param buf: String representation of incoming packet buffer. + :type buf: str + :returns: String representation of first packet in buf. + :rtype: str + """ + pkt_len = 0 + + if len(buf) < 60: + return None + + try: + ether_type = Ether(buf[0:14]).type + except AttributeError: + raise RuntimeError(f"No EtherType in packet {buf!r}") + + if ether_type == ETH_P_IP: + # 14 is Ethernet fame header size. + # 4 bytes is just enough to look for length in ip header. + # ip total length contains just the IP packet length so add the Ether + # header. + pkt_len = Ether(buf[0:14+4]).len + 14 + if len(buf) < 60: + return None + elif ether_type == ETH_P_IPV6: + if not Ether(buf[0:14+6]).haslayer(IPv6): + raise RuntimeError(f"Invalid IPv6 packet {buf!r}") + # ... to add to the above, 40 bytes is the length of IPV6 header. + # The ipv6.len only contains length of the payload and not the header + pkt_len = Ether(buf)[u"IPv6"].plen + 14 + 40 + if len(buf) < 60: + return None + elif ether_type == ETH_P_ARP: + pkt = Ether(buf[:20]) + if not pkt.haslayer(ARP): + raise RuntimeError(u"Incomplete ARP packet") + # len(eth) + arp(2 hw addr type + 2 proto addr type + # + 1b len + 1b len + 2b operation) + + pkt_len = 14 + 8 + pkt_len += 2 * pkt.getlayer(ARP).hwlen + pkt_len += 2 * pkt.getlayer(ARP).plen + + del pkt + elif ether_type == 32821: # RARP (Reverse ARP) + pkt = Ether(buf[:20]) + pkt.type = ETH_P_ARP # Change to ARP so it works with scapy + pkt = Ether(pkt) + if not pkt.haslayer(ARP): + pkt.show() + raise RuntimeError(u"Incomplete RARP packet") + + # len(eth) + arp(2 hw addr type + 2 proto addr type + # + 1b len + 1b len + 2b operation) + pkt_len = 14 + 8 + pkt_len += 2 * pkt.getlayer(ARP).hwlen + pkt_len += 2 * pkt.getlayer(ARP).plen + + del pkt + else: + raise RuntimeError(f"Unknown protocol {ether_type}") + + if pkt_len < 60: + pkt_len = 60 + + if len(buf) < pkt_len: + return None + + return buf[0:pkt_len] + + +def packet_reader(interface_name, queue): + """Sub-process routine that reads packets and puts them to queue. + + This function is meant to be run in separate subprocess and is in tight + loop reading raw packets from interface passed as parameter. + + :param interface_name: Name of interface to read packets from. + :param queue: Queue in which this function will push incoming packets. + :type interface_name: str + :type queue: multiprocessing.Queue + """ + sock = conf.L2listen(iface=interface_name, type=ETH_P_ALL) + + while True: + pkt = sock.recv(0x7fff) + queue.put(pkt) + + +class RxQueue(PacketVerifier): + """Receive queue object. + + This object creates raw socket, reads packets from it and provides + function to access them. + + :param interface_name: Which interface to bind to. + :type interface_name: str + """ + def __init__(self, interface_name): + PacketVerifier.__init__(self, interface_name) + self._sock = conf.L2listen(iface=interface_name, type=ETH_P_ALL) + + def recv(self, timeout=3, ignore=None, verbose=True): + """Read next received packet. + + Returns scapy's Ether() object created from next packet in the queue. + Queue is being filled in parallel in subprocess. If no packet + arrives in given timeout queue.Empty exception will be risen. + + :param timeout: How many seconds to wait for next packet. + :param ignore: List of packets that should be ignored. + :param verbose: Used to suppress detailed logging of received packets. + :type timeout: int + :type ignore: list + :type verbose: bool + + :returns: Ether() initialized object from packet data. + :rtype: scapy.Ether + """ + ignore_list = list() + if ignore is not None: + for ig_pkt in ignore: + # Auto pad all packets in ignore list + ignore_list.append(str(auto_pad(ig_pkt))) + while True: + rlist, _, _ = select.select([self._sock], [], [], timeout) + if self._sock not in rlist: + return None + + pkt = self._sock.recv(0x7fff) + pkt_pad = str(auto_pad(pkt)) + print(f"Received packet on {self._ifname} of len {len(pkt)}") + if verbose: + if hasattr(pkt, u"show2"): + pkt.show2() + else: + # Never happens in practice, but Pylint does not know that. + print(f"Unexpected instance: {pkt!r}") + print() + if pkt_pad in ignore_list: + ignore_list.remove(pkt_pad) + print(u"Received packet ignored.") + continue + return pkt + + +class TxQueue(PacketVerifier): + """Transmission queue object. + + This object is used to send packets over RAW socket on a interface. + + :param interface_name: Which interface to send packets from. + :type interface_name: str + """ + def __init__(self, interface_name): + PacketVerifier.__init__(self, interface_name) + self._sock = conf.L2socket(iface=interface_name, type=ETH_P_ALL) + + def send(self, pkt, verbose=True): + """Send packet out of the bound interface. + + :param pkt: Packet to send. + :param verbose: Used to suppress detailed logging of sent packets. + :type pkt: string or scapy Packet derivative. + :type verbose: bool + """ + pkt = auto_pad(pkt) + print(f"Sending packet out of {self._ifname} of len {len(pkt)}") + if verbose: + pkt.show2() + print() + + self._sock.send(pkt) + + +class Interface: + """Class for network interfaces. Contains methods for sending and receiving + packets.""" + def __init__(self, if_name): + """Initialize the interface class. + + :param if_name: Name of the interface. + :type if_name: str + """ + self.if_name = if_name + self.sent_packets = [] + self.rxq = RxQueue(if_name) + self.txq = TxQueue(if_name) + + def send_pkt(self, pkt): + """Send the provided packet out the interface.""" + self.sent_packets.append(pkt) + self.txq.send(pkt) + + def recv_pkt(self, timeout=3): + """Read one packet from the interface's receive queue. + + :param timeout: Timeout value in seconds. + :type timeout: int + :returns: Ether() initialized object from packet data. + :rtype: scapy.Ether + """ + return self.rxq.recv(timeout, self.sent_packets) + + +def create_gratuitous_arp_request(src_mac, src_ip): + """Creates scapy representation of gratuitous ARP request.""" + return (Ether(src=src_mac, dst=u"ff:ff:ff:ff:ff:ff") / + ARP(psrc=src_ip, hwsrc=src_mac, pdst=src_ip) + ) + + +def auto_pad(packet): + """Pads zeroes at the end of the packet if the total len < 60 bytes.""" + # padded = str(packet) + if len(packet) < 60: + packet[Raw].load += (b"\0" * (60 - len(packet))) + return packet + + +def checksum_equal(chksum1, chksum2): + """Compares two checksums in one's complement notation. + + Checksums to be compared are calculated as 16 bit one's complement of the + one's complement sum of 16 bit words of some buffer. + In one's complement notation 0x0000 (positive zero) and 0xFFFF + (negative zero) are equivalent. + + :param chksum1: First checksum. + :param chksum2: Second checksum. + :type chksum1: uint16 + :type chksum2: uint16 + + :returns: True if checksums are equivalent, False otherwise. + :rtype: boolean + """ + if chksum1 == 0xFFFF: + chksum1 = 0 + if chksum2 == 0xFFFF: + chksum2 = 0 + return chksum1 == chksum2 diff --git a/GPL/traffic_scripts/TrafficScriptArg.py b/GPL/traffic_scripts/TrafficScriptArg.py new file mode 100644 index 0000000000..b2f7055412 --- /dev/null +++ b/GPL/traffic_scripts/TrafficScriptArg.py @@ -0,0 +1,68 @@ +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic scripts argument parser library.""" + +import argparse + + +class TrafficScriptArg: + """Traffic scripts argument parser. + + Parse arguments for traffic script. Default has two arguments '--tx_if' + and '--rx_if'. You can provide more arguments. All arguments have string + representation of the value. You can add also optional arguments. Default + value for optional arguments is empty string. + + :param more_args: List of additional arguments (optional). + :param opt_args: List of optional arguments (optional). + :type more_args: list + :type opt_args: list + + :Example: + + >>> from TrafficScriptArg import TrafficScriptArg + >>> args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip']) + """ + + def __init__(self, more_args=None, opt_args=None): + parser = argparse.ArgumentParser() + parser.add_argument(u"--tx_if", help=u"interface that sends traffic") + parser.add_argument(u"--rx_if", help=u"interface that receives traffic") + + if more_args is not None: + for arg in more_args: + arg_name = f"--{arg}" + parser.add_argument(arg_name) + + if opt_args is not None: + for arg in opt_args: + arg_name = f"--{arg}" + parser.add_argument(arg_name, nargs=u"?", default=u"") + + self._parser = parser + self._args = vars(parser.parse_args()) + + def get_arg(self, arg_name): + """Get argument value. + + :param arg_name: Argument name. + :type arg_name: str + :returns: Argument value. + :rtype: str + """ + arg_val = self._args.get(arg_name) + if arg_val is None: + raise Exception(f"Argument '{arg_name}' not found") + + return arg_val diff --git a/GPL/traffic_scripts/__init__.py b/GPL/traffic_scripts/__init__.py new file mode 100644 index 0000000000..a946304019 --- /dev/null +++ b/GPL/traffic_scripts/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +__init__ file for directory traffic_scripts +""" diff --git a/GPL/traffic_scripts/ipsec_interface.py b/GPL/traffic_scripts/ipsec_interface.py new file mode 100644 index 0000000000..4b7d758be5 --- /dev/null +++ b/GPL/traffic_scripts/ipsec_interface.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script for IPsec verification.""" + +import sys + +from ipaddress import ip_address +from scapy.layers.inet import IP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS +from scapy.layers.ipsec import SecurityAssociation, ESP +from scapy.layers.l2 import Ether +from scapy.packet import Raw + +from .PacketVerifier import RxQueue, TxQueue +from .TrafficScriptArg import TrafficScriptArg + + +def check_ipsec( + pkt_recv, ip_layer, src_mac, dst_mac, src_tun, dst_tun, src_ip, dst_ip, + sa_in): + """Check received IPsec packet. + + :param pkt_recv: Received packet to verify. + :param ip_layer: Scapy IP layer. + :param src_mac: Source MAC address. + :param dst_mac: Destination MAC address. + :param src_tun: IPsec tunnel source address. + :param dst_tun: IPsec tunnel destination address. + :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet. + :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet. + :param sa_in: IPsec SA for packet decryption. + :type pkt_recv: scapy.Ether + :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6 + :type src_mac: str + :type dst_mac: str + :type src_tun: str + :type dst_tun: str + :type src_ip: str + :type dst_ip: str + :type sa_in: scapy.layers.ipsec.SecurityAssociation + :raises RuntimeError: If received packet is invalid. + """ + if pkt_recv[Ether].src != src_mac: + raise RuntimeError( + f"Received frame has invalid source MAC address: " + f"{pkt_recv[Ether].src} should be: {src_mac}" + ) + + if pkt_recv[Ether].dst != dst_mac: + raise RuntimeError( + f"Received frame has invalid destination MAC address: " + f"{pkt_recv[Ether].dst} should be: {dst_mac}" + ) + + if not pkt_recv.haslayer(ip_layer): + raise RuntimeError( + f"Not an {ip_layer.name} packet received: {pkt_recv!r}" + ) + + if pkt_recv[ip_layer].src != src_tun: + raise RuntimeError( + f"Received packet has invalid source address: " + f"{pkt_recv[ip_layer].src} should be: {src_tun}" + ) + + if pkt_recv[ip_layer].dst != dst_tun: + raise RuntimeError( + f"Received packet has invalid destination address: " + f"{pkt_recv[ip_layer].dst} should be: {dst_tun}" + ) + + if not pkt_recv.haslayer(ESP): + raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}") + + ip_pkt = pkt_recv[ip_layer] + d_pkt = sa_in.decrypt(ip_pkt) + + if d_pkt[ip_layer].dst != dst_ip: + raise RuntimeError( + f"Decrypted packet has invalid destination address: " + f"{d_pkt[ip_layer].dst} should be: {dst_ip}" + ) + + if d_pkt[ip_layer].src != src_ip: + raise RuntimeError( + f"Decrypted packet has invalid source address: " + f"{d_pkt[ip_layer].src} should be: {src_ip}" + ) + + if ip_layer == IP and d_pkt[ip_layer].proto != 61: + raise RuntimeError( + f"Decrypted packet has invalid IP protocol: " + f"{d_pkt[ip_layer].proto} should be: 61" + ) + + +def check_ip(pkt_recv, ip_layer, src_mac, dst_mac, src_ip, dst_ip): + """Check received IP/IPv6 packet. + + :param pkt_recv: Received packet to verify. + :param ip_layer: Scapy IP layer. + :param src_mac: Source MAC address. + :param dst_mac: Destination MAC address. + :param src_ip: Source IP/IPv6 address. + :param dst_ip: Destination IP/IPv6 address. + :type pkt_recv: scapy.Ether + :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6 + :type src_mac: str + :type dst_mac: str + :type src_ip: str + :type dst_ip: str + :raises RuntimeError: If received packet is invalid. + """ + if pkt_recv[Ether].src != src_mac: + raise RuntimeError( + f"Received frame has invalid source MAC address: " + f"{pkt_recv[Ether].src} should be: {src_mac}" + ) + + if pkt_recv[Ether].dst != dst_mac: + raise RuntimeError( + f"Received frame has invalid destination MAC address: " + f"{pkt_recv[Ether].dst} should be: {dst_mac}" + ) + + if not pkt_recv.haslayer(ip_layer): + raise RuntimeError( + f"Not an {ip_layer.name} packet received: {pkt_recv!r}" + ) + + if pkt_recv[ip_layer].dst != dst_ip: + raise RuntimeError( + f"Received packet has invalid destination address: " + f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}" + ) + + if pkt_recv[ip_layer].src != src_ip: + raise RuntimeError( + f"Received packet has invalid destination address: " + f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}" + ) + + if ip_layer == IP and pkt_recv[ip_layer].proto != 61: + raise RuntimeError( + f"Received packet has invalid IP protocol: " + f"{pkt_recv[ip_layer].proto} should be: 61" + ) + + +def main(): + """Send and receive IPsec packet.""" + + args = TrafficScriptArg( + [ + u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac", + u"src_ip", u"dst_ip", u"src_tun", u"dst_tun", u"crypto_alg", + u"crypto_key", u"integ_alg", u"integ_key", u"l_spi", u"r_spi" + ] + ) + + tx_txq = TxQueue(args.get_arg(u"tx_if")) + tx_rxq = RxQueue(args.get_arg(u"tx_if")) + rx_txq = TxQueue(args.get_arg(u"rx_if")) + rx_rxq = RxQueue(args.get_arg(u"rx_if")) + + tx_src_mac = args.get_arg(u"tx_src_mac") + tx_dst_mac = args.get_arg(u"tx_dst_mac") + rx_src_mac = args.get_arg(u"rx_src_mac") + rx_dst_mac = args.get_arg(u"rx_dst_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + src_tun = args.get_arg(u"src_tun") + dst_tun = args.get_arg(u"dst_tun") + crypto_alg = args.get_arg(u"crypto_alg") + crypto_key = args.get_arg(u"crypto_key") + integ_alg = args.get_arg(u"integ_alg") + integ_key = args.get_arg(u"integ_key") + l_spi = int(args.get_arg(u"l_spi")) + r_spi = int(args.get_arg(u"r_spi")) + + ip_layer = IP if ip_address(src_ip).version == 4 else IPv6 + ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \ + else ip_layer(src=src_ip, dst=dst_ip) + + tunnel_in = ip_layer(src=src_tun, dst=dst_tun) + tunnel_out = ip_layer(src=dst_tun, dst=src_tun) + + sa_in = SecurityAssociation( + ESP, spi=l_spi, crypt_algo=crypto_alg, + crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg, + auth_key=integ_key.encode(encoding=u"utf-8"), + tunnel_header=tunnel_in + ) + + sa_out = SecurityAssociation( + ESP, spi=r_spi, crypt_algo=crypto_alg, + crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg, + auth_key=integ_key.encode(encoding=u"utf-8"), + tunnel_header=tunnel_out + ) + + sent_packets = list() + tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) / ip_pkt) + tx_pkt_send /= Raw() + size_limit = 78 if ip_layer == IPv6 else 64 + if len(tx_pkt_send) < size_limit: + tx_pkt_send[Raw].load += (b"\0" * (size_limit - len(tx_pkt_send))) + sent_packets.append(tx_pkt_send) + tx_txq.send(tx_pkt_send) + + while True: + rx_pkt_recv = rx_rxq.recv(2) + + if rx_pkt_recv is None: + raise RuntimeError(f"{ip_layer.name} packet Rx timeout") + + if rx_pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + + check_ipsec( + rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, src_tun, dst_tun, src_ip, + dst_ip, sa_in + ) + + ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \ + else ip_layer(src=dst_ip, dst=src_ip) + ip_pkt /= Raw() + if len(ip_pkt) < (size_limit - 14): + ip_pkt[Raw].load += (b"\0" * (size_limit - 14 - len(ip_pkt))) + e_pkt = sa_out.encrypt(ip_pkt) + rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) / + e_pkt) + rx_txq.send(rx_pkt_send) + + while True: + tx_pkt_recv = tx_rxq.recv(2, ignore=sent_packets) + + if tx_pkt_recv is None: + raise RuntimeError(f"{ip_layer.name} packet Rx timeout") + + if tx_pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + + check_ip(tx_pkt_recv, ip_layer, tx_dst_mac, tx_src_mac, dst_ip, src_ip) + + sys.exit(0) + + +if __name__ == u"__main__": + main() diff --git a/GPL/traffic_scripts/ipsec_policy.py b/GPL/traffic_scripts/ipsec_policy.py new file mode 100644 index 0000000000..9e168ee938 --- /dev/null +++ b/GPL/traffic_scripts/ipsec_policy.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script for IPsec verification.""" + +import sys + +from ipaddress import ip_address +from scapy.layers.inet import IP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS +from scapy.layers.ipsec import SecurityAssociation, ESP +from scapy.layers.l2 import Ether +from scapy.packet import Raw + +from .PacketVerifier import RxQueue, TxQueue +from .TrafficScriptArg import TrafficScriptArg + + +def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in): + """Check received IPsec packet. + + :param pkt_recv: Received packet to verify. + :param ip_layer: Scapy IP layer. + :param dst_tun: IPsec tunnel destination address. + :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet. + :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet. + :param sa_in: IPsec SA for packet decryption. + :type pkt_recv: scapy.Ether + :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6 + :type dst_tun: str + :type src_ip: str + :type dst_ip: str + :type sa_in: scapy.layers.ipsec.SecurityAssociation + :raises RuntimeError: If received packet is invalid. + """ + if not pkt_recv.haslayer(ip_layer): + raise RuntimeError( + f"Not an {ip_layer.name} packet received: {pkt_recv!r}" + ) + + if pkt_recv[ip_layer].dst != dst_tun: + raise RuntimeError( + f"Received packet has invalid destination address: " + f"{pkt_recv[ip_layer].dst} should be: {dst_tun}" + ) + + if not pkt_recv.haslayer(ESP): + raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}") + + ip_pkt = pkt_recv[ip_layer] + d_pkt = sa_in.decrypt(ip_pkt) + + if d_pkt[ip_layer].dst != dst_ip: + raise RuntimeError( + f"Decrypted packet has invalid destination address: " + f"{d_pkt[ip_layer].dst} should be: {dst_ip}" + ) + + if d_pkt[ip_layer].src != src_ip: + raise RuntimeError( + f"Decrypted packet has invalid source address: " + f"{d_pkt[ip_layer].src} should be: {src_ip}" + ) + + if ip_layer == IP and d_pkt[ip_layer].proto != 61: + raise RuntimeError( + f"Decrypted packet has invalid IP protocol: " + f"{d_pkt[ip_layer].proto} should be: 61" + ) + + +def check_ip(pkt_recv, ip_layer, src_ip, dst_ip): + """Check received IP/IPv6 packet. + + :param pkt_recv: Received packet to verify. + :param ip_layer: Scapy IP layer. + :param src_ip: Source IP/IPv6 address. + :param dst_ip: Destination IP/IPv6 address. + :type pkt_recv: scapy.Ether + :type ip_layer: scapy.layers.inet.IP or scapy.layers.inet6.IPv6 + :type src_ip: str + :type dst_ip: str + :raises RuntimeError: If received packet is invalid. + """ + if not pkt_recv.haslayer(ip_layer): + raise RuntimeError( + f"Not an {ip_layer.name} packet received: {pkt_recv!r}" + ) + + if pkt_recv[ip_layer].dst != dst_ip: + raise RuntimeError( + f"Received packet has invalid destination address: " + f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}" + ) + + if pkt_recv[ip_layer].src != src_ip: + raise RuntimeError( + f"Received packet has invalid destination address: " + f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}" + ) + + if ip_layer == IP and pkt_recv[ip_layer].proto != 61: + raise RuntimeError( + f"Received packet has invalid IP protocol: " + f"{pkt_recv[ip_layer].proto} should be: 61" + ) + + +# TODO: Pylint says too-many-locals and too-many-statements. Refactor! +def main(): + """Send and receive IPsec packet.""" + + args = TrafficScriptArg( + [ + u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac", + u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg", + u"integ_key", u"l_spi", u"r_spi" + ], + [u"src_tun", u"dst_tun"] + ) + + tx_txq = TxQueue(args.get_arg(u"tx_if")) + tx_rxq = RxQueue(args.get_arg(u"tx_if")) + rx_txq = TxQueue(args.get_arg(u"rx_if")) + rx_rxq = RxQueue(args.get_arg(u"rx_if")) + + tx_src_mac = args.get_arg(u"tx_src_mac") + tx_dst_mac = args.get_arg(u"tx_dst_mac") + rx_src_mac = args.get_arg(u"rx_src_mac") + rx_dst_mac = args.get_arg(u"rx_dst_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + crypto_alg = args.get_arg(u"crypto_alg") + crypto_key = args.get_arg(u"crypto_key") + integ_alg = args.get_arg(u"integ_alg") + integ_key = args.get_arg(u"integ_key") + l_spi = int(args.get_arg(u"l_spi")) + r_spi = int(args.get_arg(u"r_spi")) + src_tun = args.get_arg(u"src_tun") + dst_tun = args.get_arg(u"dst_tun") + + ip_layer = IP if ip_address(src_ip).version == 4 else IPv6 + + tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \ + else None + tunnel_in = ip_layer(src=dst_tun, dst=src_tun) if src_tun and dst_tun \ + else None + + if not (src_tun and dst_tun): + src_tun = src_ip + + sa_in = SecurityAssociation( + ESP, spi=r_spi, crypt_algo=crypto_alg, + crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg, + auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in + ) + + sa_out = SecurityAssociation( + ESP, spi=l_spi, crypt_algo=crypto_alg, + crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg, + auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out + ) + + ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \ + else ip_layer(src=src_ip, dst=dst_ip) + ip_pkt = ip_layer(ip_pkt) + + e_pkt = sa_out.encrypt(ip_pkt) + tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) / + e_pkt) + + sent_packets = list() + tx_pkt_send /= Raw() + sent_packets.append(tx_pkt_send) + tx_txq.send(tx_pkt_send) + + while True: + rx_pkt_recv = rx_rxq.recv(2) + + if rx_pkt_recv is None: + raise RuntimeError(f"{ip_layer.name} packet Rx timeout") + + if rx_pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + + check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip) + + rx_ip_pkt = ip_layer(src=dst_ip, dst=src_ip, proto=61) if ip_layer == IP \ + else ip_layer(src=dst_ip, dst=src_ip) + rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) / + rx_ip_pkt) + + rx_pkt_send /= Raw() + rx_txq.send(rx_pkt_send) + + while True: + tx_pkt_recv = tx_rxq.recv(2, sent_packets) + + if tx_pkt_recv is None: + raise RuntimeError(u"ESP packet Rx timeout") + + if tx_pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + + check_ipsec(tx_pkt_recv, ip_layer, src_tun, dst_ip, src_ip, sa_in) + + sys.exit(0) + + +if __name__ == u"__main__": + main() diff --git a/GPL/traffic_scripts/lisp/__init__.py b/GPL/traffic_scripts/lisp/__init__.py new file mode 100644 index 0000000000..eef4a31bb0 --- /dev/null +++ b/GPL/traffic_scripts/lisp/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +__init__ file for directory lisp +""" diff --git a/GPL/traffic_scripts/lisp/lisp_check.py b/GPL/traffic_scripts/lisp/lisp_check.py new file mode 100644 index 0000000000..1bdafb799a --- /dev/null +++ b/GPL/traffic_scripts/lisp/lisp_check.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives +a LISP-encapsulated packet on the other interface and verifies received packet. +""" + +import sys +import ipaddress + +from scapy.all import bind_layers, Packet +from scapy.fields import FlagsField, BitField, IntField +from scapy.layers.inet import ICMP, IP, UDP +from scapy.layers.inet6 import ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6 +from scapy.layers.l2 import Ether +from scapy.packet import Raw + +from ..PacketVerifier import RxQueue, TxQueue +from ..TrafficScriptArg import TrafficScriptArg + + +class LispHeader(Packet): + """Scapy header for the LISP Layer.""" + + name = u"Lisp Header" + fields_desc = [ + FlagsField( + u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""] + ), + BitField(u"nonce/map_version", 0, size=24), + IntField(u"instance_id/locator_status_bits", 0)] + + +class LispInnerIP(IP): + """Scapy inner LISP layer for IPv4-in-IPv4.""" + + name = u"Lisp Inner Layer - IPv4" + + +class LispInnerIPv6(IPv6): + """Scapy inner LISP layer for IPv6-in-IPv6.""" + + name = u"Lisp Inner Layer - IPv6" + + +def valid_ipv4(ip): + try: + ipaddress.IPv4Address(ip) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def valid_ipv6(ip): + try: + ipaddress.IPv6Address(ip) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def main(): + """Send IP ICMP packet from one traffic generator interface to the other. + + :raises RuntimeError: If the received packet is not correct.""" + + args = TrafficScriptArg( + [ + u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac", + u"dut_if2_mac", u"src_rloc", u"dst_rloc" + ], + [u"ot_mode"] + ) + + tx_src_mac = args.get_arg(u"tg_src_mac") + tx_dst_mac = args.get_arg(u"dut_if1_mac") + rx_dst_mac = args.get_arg(u"tg_dst_mac") + rx_src_mac = args.get_arg(u"dut_if2_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + src_rloc = args.get_arg(u"src_rloc") + dst_rloc = args.get_arg(u"dst_rloc") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + ot_mode = args.get_arg(u"ot_mode") + + rxq = RxQueue(rx_if) + txq = TxQueue(tx_if) + + pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) + + if valid_ipv4(src_ip) and valid_ipv4(dst_ip): + pkt_raw /= IP(src=src_ip, dst=dst_ip) + pkt_raw /= ICMP() + ip_format = IP + lisp_layer = LispInnerIP + elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): + pkt_raw /= IPv6(src=src_ip, dst=dst_ip) + pkt_raw /= ICMPv6EchoRequest() + ip_format = IPv6 + lisp_layer = LispInnerIPv6 + else: + raise ValueError(u"IP not in correct format") + + bind_layers(UDP, LispHeader, dport=4341) + bind_layers(LispHeader, lisp_layer) + + pkt_raw /= Raw() + sent_packets = list() + sent_packets.append(pkt_raw) + txq.send(pkt_raw) + + if tx_if == rx_if: + ether = rxq.recv(2, ignore=sent_packets) + else: + ether = rxq.recv(2) + + if ether is None: + raise RuntimeError(u"ICMP echo Rx timeout") + + if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: + print(u"MAC addresses match.") + else: + raise RuntimeError(f"Matching packet unsuccessful: {ether!r}") + + ip = ether.payload + + if ot_mode == u"6to4": + if not isinstance(ip, IP): + raise RuntimeError(f"Not an IP packet received {ip!r}") + elif ot_mode == u"4to6": + if not isinstance(ip, IPv6): + raise RuntimeError(f"Not an IP packet received {ip!r}") + elif not isinstance(ip, ip_format): + raise RuntimeError(f"Not an IP packet received {ip!r}") + + lisp = ether.getlayer(lisp_layer) + if not lisp: + raise RuntimeError("Lisp layer not present or parsing failed.") + + # Compare data from packets + if src_ip == lisp.src: + print(u"Source IP matches source EID.") + else: + raise RuntimeError( + f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}" + ) + + if dst_ip == lisp.dst: + print(u"Destination IP matches destination EID.") + else: + raise RuntimeError( + f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}" + ) + + if src_rloc == ip.src: + print(u"Source RLOC matches configuration.") + else: + raise RuntimeError( + f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}" + ) + + if dst_rloc == ip.dst: + print(u"Destination RLOC matches configuration.") + else: + raise RuntimeError( + f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}" + ) + + sys.exit(0) + + +if __name__ == u"__main__": + main() diff --git a/GPL/traffic_scripts/lisp/lispgpe_check.py b/GPL/traffic_scripts/lisp/lispgpe_check.py new file mode 100644 index 0000000000..27a83f5701 --- /dev/null +++ b/GPL/traffic_scripts/lisp/lispgpe_check.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives +a LISPGPE-encapsulated packet on the other interface and verifies received +packet. +""" + +import sys +import ipaddress + +from scapy.all import bind_layers, Packet +from scapy.fields import FlagsField, BitField, XBitField, IntField +from scapy.layers.inet import ICMP, IP, UDP +from scapy.layers.inet6 import ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6 +from scapy.layers.l2 import Ether +from scapy.packet import Raw + +from ..PacketVerifier import RxQueue, TxQueue +from ..TrafficScriptArg import TrafficScriptArg + + +class LispGPEHeader(Packet): + """Scapy header for the Lisp GPE Layer.""" + + name = "Lisp GPE Header" + fields_desc = [ + FlagsField( + u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"] + ), + BitField(u"version", 0, size=2), + BitField(u"reserved", 0, size=14), + XBitField(u"next_protocol", 0, size=8), + IntField(u"instance_id/locator_status_bits", 0) + ] + + def guess_payload_class(self, payload): + protocol = { + 0x1: LispGPEInnerIP, + 0x2: LispGPEInnerIPv6, + 0x3: LispGPEInnerEther, + 0x4: LispGPEInnerNSH + } + return protocol[self.next_protocol] + + +class LispGPEInnerIP(IP): + """Scapy inner LISP GPE layer for IPv4-in-IPv4.""" + + name = u"Lisp GPE Inner Layer - IPv4" + + +class LispGPEInnerIPv6(IPv6): + """Scapy inner LISP GPE layer for IPv6-in-IPv6.""" + + name = u"Lisp GPE Inner Layer - IPv6" + + +class LispGPEInnerEther(Ether): + """Scapy inner LISP GPE layer for Lisp-L2.""" + + name = u"Lisp GPE Inner Layer - Ethernet" + + +class LispGPEInnerNSH(Packet): + """Scapy inner LISP GPE layer for Lisp-NSH. + + Parsing not implemented. + """ + + +def valid_ipv4(ip): + try: + ipaddress.IPv4Address(ip) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def valid_ipv6(ip): + try: + ipaddress.IPv6Address(ip) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def main(): + """Send IP ICMP packet from one traffic generator interface to the other. + + :raises RuntimeError: If the received packet is not correct.""" + + args = TrafficScriptArg( + [ + u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac", + u"dut_if2_mac", u"src_rloc", u"dst_rloc" + ], + [u"ot_mode"] + ) + + tx_src_mac = args.get_arg(u"tg_src_mac") + tx_dst_mac = args.get_arg(u"dut_if1_mac") + rx_dst_mac = args.get_arg(u"tg_dst_mac") + rx_src_mac = args.get_arg(u"dut_if2_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + src_rloc = args.get_arg(u"src_rloc") + dst_rloc = args.get_arg(u"dst_rloc") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + ot_mode = args.get_arg(u"ot_mode") + + rxq = RxQueue(rx_if) + txq = TxQueue(tx_if) + + pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) + + if valid_ipv4(src_ip) and valid_ipv4(dst_ip): + pkt_raw /= IP(src=src_ip, dst=dst_ip) + pkt_raw /= ICMP() + ip_format = IP + elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): + pkt_raw /= IPv6(src=src_ip, dst=dst_ip) + pkt_raw /= ICMPv6EchoRequest() + ip_format = IPv6 + else: + raise ValueError(u"IP not in correct format") + + bind_layers(UDP, LispGPEHeader, dport=4341) + + pkt_raw /= Raw() + sent_packets = list() + sent_packets.append(pkt_raw) + txq.send(pkt_raw) + + if tx_if == rx_if: + ether = rxq.recv(2, ignore=sent_packets) + else: + ether = rxq.recv(2) + + if ether is None: + raise RuntimeError(u"ICMP echo Rx timeout") + + if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: + print(u"MAC addresses match.") + else: + raise RuntimeError(f"Matching packet unsuccessful: {ether!r}") + + ip = ether.payload + + if ot_mode == u"6to4": + if not isinstance(ip, IP): + raise RuntimeError(f"Not an IP packet received {ip!r}") + elif ot_mode == u"4to6": + if not isinstance(ip, IPv6): + raise RuntimeError(f"Not an IP packet received {ip!r}") + elif not isinstance(ip, ip_format): + raise RuntimeError(f"Not an IP packet received {ip!r}") + + lisp = ether.getlayer(LispGPEHeader).underlayer + if not lisp: + raise RuntimeError(u"Lisp layer not present or parsing failed.") + + # Compare data from packets + if src_ip == lisp.src: + print(u"Source IP matches source EID.") + else: + raise RuntimeError( + f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}" + ) + + if dst_ip == lisp.dst: + print(u"Destination IP matches destination EID.") + else: + raise RuntimeError( + f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}" + ) + + if src_rloc == ip.src: + print(u"Source RLOC matches configuration.") + else: + raise RuntimeError( + f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}" + ) + + if dst_rloc == ip.dst: + print(u"Destination RLOC matches configuration.") + else: + raise RuntimeError( + f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}" + ) + + sys.exit(0) + + +if __name__ == u"__main__": + main() diff --git a/GPL/traffic_scripts/policer.py b/GPL/traffic_scripts/policer.py new file mode 100644 index 0000000000..db90bb21f3 --- /dev/null +++ b/GPL/traffic_scripts/policer.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script for IPsec verification.""" + +import sys +import logging + +from ipaddress import ip_address +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS +from scapy.packet import Raw + +from .TrafficScriptArg import TrafficScriptArg +from .PacketVerifier import RxQueue, TxQueue + + +def check_ipv4(pkt_recv, dscp): + """Check received IPv4 IPsec packet. + + :param pkt_recv: Received packet to verify. + :param dscp: DSCP value to check. + :type pkt_recv: scapy.Ether + :type dscp: int + :raises RuntimeError: If received packet is invalid. + """ + if not pkt_recv.haslayer(IP): + raise RuntimeError(f"Not an IPv4 packet received: {pkt_recv!r}") + + rx_dscp = pkt_recv[IP].tos >> 2 + if rx_dscp != dscp: + raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}") + + if not pkt_recv.haslayer(TCP): + raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}") + + +def check_ipv6(pkt_recv, dscp): + """Check received IPv6 IPsec packet. + + :param pkt_recv: Received packet to verify. + :param dscp: DSCP value to check. + :type pkt_recv: scapy.Ether + :type dscp: int + :raises RuntimeError: If received packet is invalid. + """ + if not pkt_recv.haslayer(IPv6): + raise RuntimeError(f"Not an IPv6 packet received: {pkt_recv!r}") + + rx_dscp = pkt_recv[IPv6].tc >> 2 + if rx_dscp != dscp: + raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}") + + if not pkt_recv.haslayer(TCP): + raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}") + + +# TODO: Pylint says too-many-locals and too-many-statements. Refactor! +def main(): + """Send and receive TCP packet.""" + args = TrafficScriptArg( + [u"src_mac", u"dst_mac", u"src_ip", u"dst_ip", u"dscp"] + ) + + rxq = RxQueue(args.get_arg(u"rx_if")) + txq = TxQueue(args.get_arg(u"tx_if")) + + src_mac = args.get_arg(u"src_mac") + dst_mac = args.get_arg(u"dst_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + dscp = int(args.get_arg(u"dscp")) + + ip_layer = IPv6 if ip_address(src_ip).version == 6 else IP + + sent_packets = list() + pkt_send = (Ether(src=src_mac, dst=dst_mac) / + ip_layer(src=src_ip, dst=dst_ip) / + TCP()) + + pkt_send /= Raw() + sent_packets.append(pkt_send) + txq.send(pkt_send) + + while True: + pkt_recv = rxq.recv(2, sent_packets) + if pkt_recv is None: + raise RuntimeError(u"ICMPv6 echo reply Rx timeout") + + if pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + + if pkt_recv is None: + raise RuntimeError(u"Rx timeout") + + if ip_layer == IP: + check_ipv4(pkt_recv, dscp) + else: + check_ipv6(pkt_recv, dscp) + + sys.exit(0) + + +if __name__ == u"__main__": + main() diff --git a/GPL/traffic_scripts/send_icmp_wait_for_reply.py b/GPL/traffic_scripts/send_icmp_wait_for_reply.py new file mode 100644 index 0000000000..78554634af --- /dev/null +++ b/GPL/traffic_scripts/send_icmp_wait_for_reply.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script that sends an IP ICMPv4 or ICMPv6.""" + +import sys +import ipaddress + +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\ + ICMPv6ND_NS +from scapy.layers.l2 import Ether +from scapy.packet import Raw + +from .PacketVerifier import RxQueue, TxQueue +from .TrafficScriptArg import TrafficScriptArg + + +def valid_ipv4(ip): + """Check if IP address has the correct IPv4 address format. + + :param ip: IP address. + :type ip: str + :return: True in case of correct IPv4 address format, + otherwise return False. + :rtype: bool + """ + try: + ipaddress.IPv4Address(ip) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def valid_ipv6(ip): + """Check if IP address has the correct IPv6 address format. + + :param ip: IP address. + :type ip: str + :return: True in case of correct IPv6 address format, + otherwise return False. + :rtype: bool + """ + try: + ipaddress.IPv6Address(ip) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def main(): + """Send ICMP echo request and wait for ICMP echo reply. It ignores all other + packets.""" + args = TrafficScriptArg( + [u"dst_mac", u"src_mac", u"dst_ip", u"src_ip", u"timeout"] + ) + + dst_mac = args.get_arg(u"dst_mac") + src_mac = args.get_arg(u"src_mac") + dst_ip = args.get_arg(u"dst_ip") + src_ip = args.get_arg(u"src_ip") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + timeout = int(args.get_arg(u"timeout")) + wait_step = 1 + + rxq = RxQueue(rx_if) + txq = TxQueue(tx_if) + sent_packets = [] + + # Create empty ip ICMP packet + if valid_ipv4(src_ip) and valid_ipv4(dst_ip): + ip_layer = IP + icmp_req = ICMP + icmp_resp = ICMP + icmp_type = 0 # echo-reply + elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): + ip_layer = IP + icmp_req = ICMPv6EchoRequest + icmp_resp = ICMPv6EchoReply + icmp_type = 0 # Echo Reply + else: + raise ValueError(u"IP not in correct format") + + icmp_request = ( + Ether(src=src_mac, dst=dst_mac) / + ip_layer(src=src_ip, dst=dst_ip) / + icmp_req() + ) + + # Send created packet on the interface + icmp_request /= Raw() + sent_packets.append(icmp_request) + txq.send(icmp_request) + + for _ in range(1000): + while True: + icmp_reply = rxq.recv(wait_step, ignore=sent_packets) + if icmp_reply is None: + timeout -= wait_step + if timeout < 0: + raise RuntimeError(u"ICMP echo Rx timeout") + + elif icmp_reply.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break + + if icmp_reply[ip_layer][icmp_resp].type == icmp_type: + if icmp_reply[ip_layer].src == dst_ip and \ + icmp_reply[ip_layer].dst == src_ip: + break + else: + raise RuntimeError(u"Max packet count limit reached") + + print(u"ICMP echo reply received.") + + sys.exit(0) + + +if __name__ == u"__main__": + main() diff --git a/GPL/traffic_scripts/send_ip_check_headers.py b/GPL/traffic_scripts/send_ip_check_headers.py new file mode 100644 index 0000000000..ff43563025 --- /dev/null +++ b/GPL/traffic_scripts/send_ip_check_headers.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script that sends an IP IPv4/IPv6 packet from one interface +to the other. Source and destination IP addresses and source and destination +MAC addresses are checked in received packet. +""" + +import sys + +import ipaddress + +from robot.api import logger +from scapy.layers.inet import IP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS +from scapy.layers.l2 import Ether, Dot1Q +from scapy.packet import Raw + +from .PacketVerifier import RxQueue, TxQueue +from .TrafficScriptArg import TrafficScriptArg + + +def valid_ipv4(ip): + try: + ipaddress.IPv4Address(ip) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def valid_ipv6(ip): + try: + ipaddress.IPv6Address(ip) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def main(): + """Send IP/IPv6 packet from one traffic generator interface to the other.""" + args = TrafficScriptArg( + [ + u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac", + u"dut_if2_mac" + ], + [ + u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx", + u"vlan_rx", u"vlan_outer_rx" + ] + ) + + tx_src_mac = args.get_arg(u"tg_src_mac") + tx_dst_mac = args.get_arg(u"dut_if1_mac") + rx_dst_mac = args.get_arg(u"tg_dst_mac") + rx_src_mac = args.get_arg(u"dut_if2_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + + encaps_tx = args.get_arg(u"encaps_tx") + vlan_tx = args.get_arg(u"vlan_tx") + vlan_outer_tx = args.get_arg(u"vlan_outer_tx") + encaps_rx = args.get_arg(u"encaps_rx") + vlan_rx = args.get_arg(u"vlan_rx") + vlan_outer_rx = args.get_arg(u"vlan_outer_rx") + + rxq = RxQueue(rx_if) + txq = TxQueue(tx_if) + + sent_packets =list() + pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) + + if encaps_tx == u"Dot1q": + pkt_raw /= Dot1Q(vlan=int(vlan_tx)) + elif encaps_tx == u"Dot1ad": + pkt_raw.type = 0x88a8 + pkt_raw /= Dot1Q(vlan=vlan_outer_tx) + pkt_raw /= Dot1Q(vlan=vlan_tx) + + if valid_ipv4(src_ip) and valid_ipv4(dst_ip): + pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61) + ip_format = IP + elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): + pkt_raw /= IPv6(src=src_ip, dst=dst_ip) + ip_format = IPv6 + else: + raise ValueError(u"IP not in correct format") + + pkt_raw /= Raw() + sent_packets.append(pkt_raw) + txq.send(pkt_raw) + + while True: + if tx_if == rx_if: + ether = rxq.recv(2, ignore=sent_packets) + else: + ether = rxq.recv(2) + + if ether is None: + raise RuntimeError(u"IP packet Rx timeout") + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + + if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: + logger.trace(u"MAC matched") + else: + raise RuntimeError(f"Matching packet unsuccessful: {ether!r}") + + if encaps_rx == u"Dot1q": + if ether[Dot1Q].vlan == int(vlan_rx): + logger.trace(u"VLAN matched") + else: + raise RuntimeError( + f"Ethernet frame with wrong VLAN tag " + f"({ether[Dot1Q].vlan}-received, " + f"{vlan_rx}-expected):\n{ether!r}" + ) + ip = ether[Dot1Q].payload + elif encaps_rx == u"Dot1ad": + raise NotImplementedError() + else: + ip = ether.payload + + if not isinstance(ip, ip_format): + raise RuntimeError(f"Not an IP packet received {ip!r}") + + # Compare data from packets + if src_ip == ip.src: + logger.trace(u"Src IP matched") + else: + raise RuntimeError( + f"Matching Src IP unsuccessful: {src_ip} != {ip.src}" + ) + + if dst_ip == ip.dst: + logger.trace(u"Dst IP matched") + else: + raise RuntimeError( + f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}" + ) + + sys.exit(0) + + +if __name__ == u"__main__": + main() diff --git a/GPL/traffic_scripts/send_vxlan_check_vxlan.py b/GPL/traffic_scripts/send_vxlan_check_vxlan.py new file mode 100644 index 0000000000..0b1da81f18 --- /dev/null +++ b/GPL/traffic_scripts/send_vxlan_check_vxlan.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to +the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. +""" + +import sys + +from scapy.layers.inet import IP, UDP +from scapy.layers.l2 import Ether +from scapy.packet import Raw + +from .PacketVerifier import RxQueue, TxQueue +from .TrafficScriptArg import TrafficScriptArg +from . import vxlan + + +def main(): + """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to + the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. + """ + args = TrafficScriptArg( + [ + u"tx_src_mac", u"tx_dst_mac", u"tx_src_ip", u"tx_dst_ip", u"tx_vni", + u"rx_src_ip", u"rx_dst_ip", u"rx_vni" + ] + ) + + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + tx_src_mac = args.get_arg(u"tx_src_mac") + tx_dst_mac = args.get_arg(u"tx_dst_mac") + tx_src_ip = args.get_arg(u"tx_src_ip") + tx_dst_ip = args.get_arg(u"tx_dst_ip") + tx_vni = args.get_arg(u"tx_vni") + rx_src_ip = args.get_arg(u"rx_src_ip") + rx_dst_ip = args.get_arg(u"rx_dst_ip") + rx_vni = args.get_arg(u"rx_vni") + + rxq = RxQueue(rx_if) + txq = TxQueue(tx_if) + + sent_packets = [] + + tx_pkt_p = (Ether(src=u"02:00:00:00:00:01", dst=u"02:00:00:00:00:02") / + IP(src=u"192.168.1.1", dst=u"192.168.1.2") / + UDP(sport=12345, dport=1234) / + Raw(u"raw data")) + + pkt_raw = (Ether(src=tx_src_mac, dst=tx_dst_mac) / + IP(src=tx_src_ip, dst=tx_dst_ip) / + UDP(sport=23456) / + vxlan.VXLAN(vni=int(tx_vni)) / + tx_pkt_p) + + pkt_raw /= Raw() + # Send created packet on one interface and receive on the other + sent_packets.append(pkt_raw) + txq.send(pkt_raw) + + ether = rxq.recv(2, ignore=sent_packets) + + # Check whether received packet contains layers Ether, IP and VXLAN + if ether is None: + raise RuntimeError(u"Packet Rx timeout") + ip = ether.payload + + if ip.src != rx_src_ip: + raise RuntimeError(f"IP src mismatch {ip.src} != {rx_src_ip}") + if ip.dst != rx_dst_ip: + raise RuntimeError(f"IP dst mismatch {ip.dst} != {rx_dst_ip}") + if ip.payload.dport != 4789: + raise RuntimeError( + f"VXLAN UDP port mismatch {ip.payload.dport} != 4789" + ) + vxlan_pkt = ip.payload.payload + + if int(vxlan_pkt.vni) != int(rx_vni): + raise RuntimeError(u"vxlan mismatch") + rx_pkt_p = vxlan_pkt.payload + + if rx_pkt_p.src != tx_pkt_p.src: + raise RuntimeError( + f"RX encapsulated MAC src mismatch {rx_pkt_p.src} != {tx_pkt_p.src}" + ) + if rx_pkt_p.dst != tx_pkt_p.dst: + raise RuntimeError( + f"RX encapsulated MAC dst mismatch {rx_pkt_p.dst} != {tx_pkt_p.dst}" + ) + if rx_pkt_p[IP].src != tx_pkt_p[IP].src: + raise RuntimeError( + f"RX encapsulated IP src mismatch {rx_pkt_p[IP].src} != " + f"{tx_pkt_p[IP].src}" + ) + if rx_pkt_p[IP].dst != tx_pkt_p[IP].dst: + raise RuntimeError( + f"RX encapsulated IP dst mismatch {rx_pkt_p[IP].dst} != " + f"{tx_pkt_p[IP].dst}" + ) + + # TODO: verify inner Ether() + + sys.exit(0) + +if __name__ == u"__main__": + main() diff --git a/GPL/traffic_scripts/srv6_encap.py b/GPL/traffic_scripts/srv6_encap.py new file mode 100644 index 0000000000..9db9538659 --- /dev/null +++ b/GPL/traffic_scripts/srv6_encap.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script for SRv6 verification.""" + +import sys + +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, IPv6ExtHdrSegmentRouting,\ + ipv6nh +from scapy.layers.l2 import Ether +from scapy.packet import Raw + +from .PacketVerifier import RxQueue, TxQueue +from .TrafficScriptArg import TrafficScriptArg + + +def check_srv6( + pkt_recv, src_mac, dst_mac, src_ip, dst_ip, dir_srcsid, dir_dstsid1, + dir_dstsid2, dir_dstsid3, segleft_corr, static_proxy=False): + """Check received SRv6 packet. + + :param pkt_recv: Received packet to verify. + :param src_mac: Source MAC address. + :param dst_mac: Destination MAC address. + :param src_ip: Source IP/IPv6 address of original IP/IPv6 packet. + :param dst_ip: Destination IP/IPv6 address of original IP/IPv6 packet. + :param dir_srcsid: Source SID for SR in desired direction. + :param dir_dstsid1: Destination SID1 in desired direction. + :param dir_dstsid2: Destination SID2 in desired direction. + :param dir_dstsid3: Destination SID3 in desired direction. + :param segleft_corr: Correction for expected segleft value of SRH. + :type pkt_recv: scapy.Ether + :type src_mac: str + :type dst_mac: str + :type src_ip: str + :type dst_ip: str + :type dir_srcsid: str + :type dir_dstsid1: str + :type dir_dstsid2: str + :type dir_dstsid3: str + :type segleft_corr: int + :type static_proxy; bool + :raises RuntimeError: If received packet is invalid. + """ + if pkt_recv[Ether].src != src_mac: + raise RuntimeError( + f"Received frame has invalid source MAC address: " + f"{pkt_recv[Ether].src} should be: {src_mac}" + ) + if pkt_recv[Ether].dst != dst_mac: + raise RuntimeError( + f"Received frame has invalid destination MAC address: " + f"{pkt_recv[Ether].dst} should be: {dst_mac}" + ) + if not pkt_recv.haslayer(IPv6): + raise RuntimeError( + f"Not an IPv6 packet received: {pkt_recv!r}" + ) + ip6_pkt = pkt_recv[IPv6] + if ip6_pkt.src != dir_srcsid: + raise RuntimeError( + f"Outer IPv6 packet has invalid source address: " + f"{ip6_pkt.src} should be: {dir_srcsid}" + ) + dir_dstsids = [dir_dstsid2, dir_dstsid1] if dir_dstsid3 == u"None" \ + else [dir_dstsid3, dir_dstsid2, dir_dstsid1] if not static_proxy \ + else [dir_dstsid3, dir_dstsid2] + segleft = len(dir_dstsids) - segleft_corr if not static_proxy \ + else len(dir_dstsids) - segleft_corr + 1 + if ip6_pkt.dst != dir_dstsids[segleft]: + raise RuntimeError( + f"Outer IPv6 packet has invalid destination address: " + f"{ip6_pkt.dst} should be: {dir_dstsids[segleft]}" + ) + if dir_dstsid2 == u"None": + if ip6_pkt.haslayer(IPv6ExtHdrSegmentRouting): + raise RuntimeError( + f"Segment Routing Header in received packet: {pkt_recv!r}" + ) + if ip6_pkt.nh != 41: # ipv6nh[41] == IPv6 + raise RuntimeError( + f"Outer IPv6 packet has invalid next header: " + f"{ip6_pkt.nh} should be: 41 ({ipv6nh[41]})" + ) + ip6_pkt = ip6_pkt[IPv6][1] + else: + if not pkt_recv.haslayer(IPv6ExtHdrSegmentRouting): + raise RuntimeError( + f"No Segment Routing Header in received packet: {pkt_recv!r}" + ) + if ip6_pkt.nh != 43: # ipv6nh[43] == Routing Header + raise RuntimeError( + f"Outer IPv6 packet has invalid next header: " + f"{pkt_recv[IPv6][0].nh} should be: 43 ({ipv6nh[43]})" + ) + ip6_pkt = ip6_pkt[IPv6ExtHdrSegmentRouting] + if ip6_pkt.addresses != dir_dstsids: + raise RuntimeError( + f"Segment Routing Header has invalid addresses: " + f"{ip6_pkt.addresses} should be: {dir_dstsids}" + ) + if ip6_pkt.segleft != segleft: + raise RuntimeError( + f"Segment Routing Header has invalid segleft value: " + f"{ip6_pkt.segleft} should be: {segleft}" + ) + if ip6_pkt.nh != 41: # ipv6nh[41] == IPv6 + raise RuntimeError( + f"Segment Routing Header has invalid next header: " + f"{ip6_pkt.nh} should be: 41 ({ipv6nh[41]})" + ) + ip6_pkt = ip6_pkt[IPv6] + if ip6_pkt.src != src_ip: + raise RuntimeError( + f"Inner IPv6 packet has invalid source address: " + f"{ip6_pkt.src} should be: {src_ip}" + ) + if ip6_pkt.dst != dst_ip: + raise RuntimeError( + f"Inner IPv6 packet has invalid destination address: " + f"{ip6_pkt.dst} should be: {dst_ip}" + ) + if ip6_pkt.nh != 59: # ipv6nh[59] == No Next Header + raise RuntimeError( + f"Inner IPv6 packet has invalid next header: " + f"{ip6_pkt.nh} should be: 59 ({ipv6nh[59]})" + ) + + +def check_ip(pkt_recv, src_mac, dst_mac, src_ip, dst_ip): + """Check received IPv6 packet. + + :param pkt_recv: Received packet to verify. + :param src_mac: Source MAC address. + :param dst_mac: Destination MAC address. + :param src_ip: Source IP/IPv6 address. + :param dst_ip: Destination IP/IPv6 address. + :type pkt_recv: scapy.Ether + :type src_mac: str + :type dst_mac: str + :type src_ip: str + :type dst_ip: str + :raises RuntimeError: If received packet is invalid. + """ + if pkt_recv[Ether].src != src_mac: + raise RuntimeError( + f"Received frame has invalid source MAC address: " + f"{pkt_recv[Ether].src} should be: {src_mac}" + ) + + if pkt_recv[Ether].dst != dst_mac: + raise RuntimeError( + f"Received frame has invalid destination MAC address: " + f"{pkt_recv[Ether].dst} should be: {dst_mac}" + ) + + if not pkt_recv.haslayer(IPv6): + raise RuntimeError( + f"Not an {IPv6.name} packet received: {pkt_recv!r}" + ) + + if pkt_recv[IPv6].dst != dst_ip: + raise RuntimeError( + f"Received packet has invalid destination address: " + f"{pkt_recv[IPv6.name].dst} should be: {dst_ip}" + ) + + if pkt_recv[IPv6].src != src_ip: + raise RuntimeError( + f"Received packet has invalid destination address: " + f"{pkt_recv[IPv6.name].dst} should be: {src_ip}" + ) + + if pkt_recv[IPv6].nh != 59: # ipv6nh[59] == No Next Header + raise RuntimeError( + f"Received IPv6 packet has invalid next header: " + f"{IPv6.nh} should be: 59 ({ipv6nh[59]})" + ) + + +def main(): + """Send, receive and check IPv6 and IPv6ExtHdrSegmentRouting packets.""" + + args = TrafficScriptArg( + [ + u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac", + u"src_ip", u"dst_ip", u"dir0_srcsid", u"dir0_dstsid1", + u"dir0_dstsid2", u"dir1_srcsid", u"dir1_dstsid1", u"dir1_dstsid2", + u"decap", u"dir0_dstsid3", u"dir1_dstsid3", u"static_proxy" + ] + ) + + tx_txq = TxQueue(args.get_arg(u"tx_if")) + tx_rxq = RxQueue(args.get_arg(u"tx_if")) + rx_txq = TxQueue(args.get_arg(u"rx_if")) + rx_rxq = RxQueue(args.get_arg(u"rx_if")) + + tx_src_mac = args.get_arg(u"tx_src_mac") + tx_dst_mac = args.get_arg(u"tx_dst_mac") + rx_src_mac = args.get_arg(u"rx_src_mac") + rx_dst_mac = args.get_arg(u"rx_dst_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + + dir0_srcsid = args.get_arg(u"dir0_srcsid") + dir0_dstsid1 = args.get_arg(u"dir0_dstsid1") + dir0_dstsid2 = args.get_arg(u"dir0_dstsid2") + dir1_srcsid = args.get_arg(u"dir1_srcsid") + dir1_dstsid1 = args.get_arg(u"dir1_dstsid1") + dir1_dstsid2 = args.get_arg(u"dir1_dstsid2") + decap = args.get_arg(u"decap") + dir0_dstsid3 = args.get_arg(u"dir0_dstsid3") + dir1_dstsid3 = args.get_arg(u"dir1_dstsid3") + static_proxy = args.get_arg(u"static_proxy") + + ip_pkt = IPv6(src=src_ip, dst=dst_ip) + + sent_packets = list() + tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) / ip_pkt) + tx_pkt_send /= Raw() + size_limit = 78 + if len(tx_pkt_send) < size_limit: + tx_pkt_send[Raw].load += (b"\0" * (size_limit - len(tx_pkt_send))) + sent_packets.append(tx_pkt_send) + tx_txq.send(tx_pkt_send) + + while True: + rx_pkt_recv = rx_rxq.recv(2) + + if rx_pkt_recv is None: + raise RuntimeError(f"{IPv6.name} packet Rx timeout") + + if rx_pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + + check_srv6( + rx_pkt_recv, rx_src_mac, rx_dst_mac, src_ip, dst_ip, dir0_srcsid, + dir0_dstsid1, dir0_dstsid2, dir0_dstsid3, 1 + ) + + ip_pkt = IPv6(src=dst_ip, dst=src_ip) + ip_pkt /= Raw() + if len(ip_pkt) < (size_limit - 14): + ip_pkt[Raw].load += (b"\0" * (size_limit - 14 - len(ip_pkt))) + + rx_pkt_send = ( + Ether(src=rx_dst_mac, dst=rx_src_mac) / + IPv6(src=dir1_srcsid, dst=dir1_dstsid1) / + IPv6ExtHdrSegmentRouting( + segleft=1 if dir1_dstsid3 == u"None" else 2, + lastentry=1 if dir1_dstsid3 == u"None" else 2, + addresses=[dir1_dstsid2, dir1_dstsid1] + if dir1_dstsid3 == u"None" + else [dir1_dstsid3, dir1_dstsid2, dir1_dstsid1] + ) / + ip_pkt + ) if dir1_dstsid2 != u"None" else ( + Ether(src=rx_dst_mac, dst=rx_src_mac) / + IPv6(src=dir1_srcsid, dst=dir1_dstsid1) / + ip_pkt + ) + rx_txq.send(rx_pkt_send) + + while True: + tx_pkt_recv = tx_rxq.recv(2, ignore=sent_packets) + + if tx_pkt_recv is None: + raise RuntimeError(f"{IPv6.name} packet Rx timeout") + + if tx_pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + + if decap == u"True": + check_ip(tx_pkt_recv, tx_dst_mac, tx_src_mac, dst_ip, src_ip) + else: + check_srv6( + tx_pkt_recv, tx_dst_mac, tx_src_mac, dst_ip, src_ip, dir1_srcsid, + dir1_dstsid1, dir1_dstsid2, dir1_dstsid3, 2, + bool(static_proxy == u"True") + ) + + sys.exit(0) + + +if __name__ == u"__main__": + main() diff --git a/GPL/traffic_scripts/vxlan.py b/GPL/traffic_scripts/vxlan.py new file mode 100644 index 0000000000..b39b419acc --- /dev/null +++ b/GPL/traffic_scripts/vxlan.py @@ -0,0 +1,32 @@ +# Copyright (c) 2020 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from scapy.fields import BitField, XByteField, X3BytesField +from scapy.layers.inet import UDP +from scapy.layers.l2 import Ether +from scapy.packet import Packet, bind_layers + + +class VXLAN(Packet): + name = u"VXLAN" + fields_desc = [ + BitField(u"flags", 0x08000000, 32), + X3BytesField(u"vni", 0), + XByteField(u"reserved", 0x00) + ] + + def mysummary(self): + return self.sprintf(f"VXLAN (vni={VXLAN.vni})") + +bind_layers(UDP, VXLAN, dport=4789) +bind_layers(VXLAN, Ether) -- cgit 1.2.3-korg