diff options
Diffstat (limited to 'resources/traffic_scripts/lisp')
-rwxr-xr-x | resources/traffic_scripts/lisp/lisp_check.py | 123 | ||||
-rwxr-xr-x | resources/traffic_scripts/lisp/lispgpe_check.py | 128 |
2 files changed, 139 insertions, 112 deletions
diff --git a/resources/traffic_scripts/lisp/lisp_check.py b/resources/traffic_scripts/lisp/lisp_check.py index 9937de8077..ebd769fa9d 100755 --- a/resources/traffic_scripts/lisp/lisp_check.py +++ b/resources/traffic_scripts/lisp/lisp_check.py @@ -1,5 +1,6 @@ -#!/usr/bin/env python -# Copyright (c) 2017 Cisco and/or its affiliates. +#!/usr/bin/env python3 + +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -19,12 +20,13 @@ a LISP-encapsulated packet on the other interface and verifies received packet. import sys import ipaddress +from scapy.all import bind_layers, Packet +from scapy.fields import FlagsField, BitField, IntField from scapy.layers.inet import ICMP, IP, UDP from scapy.layers.inet6 import ICMPv6EchoRequest from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether -from scapy.all import bind_layers, Packet -from scapy.fields import FlagsField, BitField, IntField +from scapy.packet import Raw from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -32,26 +34,31 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg class LispHeader(Packet): """Scapy header for the LISP Layer.""" - name = "Lisp Header" + + name = u"Lisp Header" fields_desc = [ - FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "", "", ""]), - BitField("nonce/map_version", 0, size=24), - IntField("instance_id/locator_status_bits", 0)] + FlagsField( + u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""] + ), + BitField(u"nonce/map_version", 0, size=24), + IntField(u"instance_id/locator_status_bits", 0)] class LispInnerIP(IP): """Scapy inner LISP layer for IPv4-in-IPv4.""" - name = "Lisp Inner Layer - IPv4" + + name = u"Lisp Inner Layer - IPv4" class LispInnerIPv6(IPv6): """Scapy inner LISP layer for IPv6-in-IPv6.""" - name = "Lisp Inner Layer - IPv6" + + name = u"Lisp Inner Layer - IPv6" def valid_ipv4(ip): try: - ipaddress.IPv4Address(unicode(ip)) + ipaddress.IPv4Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -59,7 +66,7 @@ def valid_ipv4(ip): def valid_ipv6(ip): try: - ipaddress.IPv6Address(unicode(ip)) + ipaddress.IPv6Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -71,25 +78,28 @@ def main(): :raises RuntimeError: If the received packet is not correct.""" args = TrafficScriptArg( - ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', - 'dut_if2_mac', 'src_rloc', 'dst_rloc'], - ['ot_mode']) - - tx_src_mac = args.get_arg('tg_src_mac') - tx_dst_mac = args.get_arg('dut_if1_mac') - rx_dst_mac = args.get_arg('tg_dst_mac') - rx_src_mac = args.get_arg('dut_if2_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - src_rloc = args.get_arg("src_rloc") - dst_rloc = args.get_arg("dst_rloc") - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - ot_mode = args.get_arg('ot_mode') + [ + u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac", + u"dut_if2_mac", u"src_rloc", u"dst_rloc" + ], + [u"ot_mode"] + ) + + tx_src_mac = args.get_arg(u"tg_src_mac") + tx_dst_mac = args.get_arg(u"dut_if1_mac") + rx_dst_mac = args.get_arg(u"tg_dst_mac") + rx_src_mac = args.get_arg(u"dut_if2_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + src_rloc = args.get_arg(u"src_rloc") + dst_rloc = args.get_arg(u"dst_rloc") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + ot_mode = args.get_arg(u"ot_mode") rxq = RxQueue(rx_if) txq = TxQueue(tx_if) - sent_packets = [] + pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) if valid_ipv4(src_ip) and valid_ipv4(dst_ip): @@ -103,10 +113,13 @@ def main(): ip_format = IPv6 lisp_layer = LispInnerIPv6 else: - raise ValueError("IP not in correct format") + raise ValueError(u"IP not in correct format") bind_layers(UDP, LispHeader, dport=4341) bind_layers(LispHeader, lisp_layer) + + pkt_raw /= Raw() + sent_packets = list() sent_packets.append(pkt_raw) txq.send(pkt_raw) @@ -116,27 +129,23 @@ def main(): ether = rxq.recv(2) if ether is None: - raise RuntimeError("ICMP echo Rx timeout") + raise RuntimeError(u"ICMP echo Rx timeout") if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: - print("MAC addresses match.") + print(u"MAC addresses match.") else: - raise RuntimeError( - "Matching packet unsuccessful: {0}".format(ether.__repr__())) + raise RuntimeError(f"Matching packet unsuccessful: {ether!r}") ip = ether.payload - if ot_mode == '6to4': + if ot_mode == u"6to4": if not isinstance(ip, IP): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) - elif ot_mode == '4to6': - if not isinstance(ip, IP6): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") + elif ot_mode == u"4to6": + if not isinstance(ip, IPv6): + raise RuntimeError(f"Not an IP packet received {ip!r}") elif not isinstance(ip, ip_format): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") lisp = ether.getlayer(lisp_layer) if not lisp: @@ -144,31 +153,35 @@ def main(): # Compare data from packets if src_ip == lisp.src: - print("Source IP matches source EID.") + print(u"Source IP matches source EID.") else: - raise RuntimeError("Matching Src IP unsuccessful: {} != {}" - .format(src_ip, lisp.src)) + raise RuntimeError( + f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}" + ) if dst_ip == lisp.dst: - print("Destination IP matches destination EID.") + print(u"Destination IP matches destination EID.") else: - raise RuntimeError("Matching Dst IP unsuccessful: {} != {}" - .format(dst_ip, lisp.dst)) + raise RuntimeError( + f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}" + ) if src_rloc == ip.src: - print("Source RLOC matches configuration.") + print(u"Source RLOC matches configuration.") else: - raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}" - .format(src_rloc, ip.src)) + raise RuntimeError( + f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}" + ) if dst_rloc == ip.dst: - print("Destination RLOC matches configuration.") + print(u"Destination RLOC matches configuration.") else: - raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}" - .format(dst_rloc, ip.dst)) + raise RuntimeError( + f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}" + ) sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main() diff --git a/resources/traffic_scripts/lisp/lispgpe_check.py b/resources/traffic_scripts/lisp/lispgpe_check.py index d4de8635d7..50857c4fe8 100755 --- a/resources/traffic_scripts/lisp/lispgpe_check.py +++ b/resources/traffic_scripts/lisp/lispgpe_check.py @@ -1,5 +1,6 @@ -#!/usr/bin/env python -# Copyright (c) 2017 Cisco and/or its affiliates. +#!/usr/bin/env python3 + +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -20,12 +21,13 @@ packet. import sys import ipaddress +from scapy.all import bind_layers, Packet +from scapy.fields import FlagsField, BitField, XBitField, IntField from scapy.layers.inet import ICMP, IP, UDP from scapy.layers.inet6 import ICMPv6EchoRequest from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether -from scapy.all import bind_layers, Packet -from scapy.fields import FlagsField, BitField, XBitField, IntField +from scapy.packet import Raw from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -33,13 +35,17 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg class LispGPEHeader(Packet): """Scapy header for the Lisp GPE Layer.""" + name = "Lisp GPE Header" fields_desc = [ - FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]), - BitField("version", 0, size=2), - BitField("reserved", 0, size=14), - XBitField("next_protocol", 0, size=8), - IntField("instance_id/locator_status_bits", 0)] + FlagsField( + u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"] + ), + BitField(u"version", 0, size=2), + BitField(u"reserved", 0, size=14), + XBitField(u"next_protocol", 0, size=8), + IntField(u"instance_id/locator_status_bits", 0) + ] def guess_payload_class(self, payload): protocol = { @@ -53,17 +59,20 @@ class LispGPEHeader(Packet): class LispGPEInnerIP(IP): """Scapy inner LISP GPE layer for IPv4-in-IPv4.""" - name = "Lisp GPE Inner Layer - IPv4" + + name = u"Lisp GPE Inner Layer - IPv4" class LispGPEInnerIPv6(IPv6): """Scapy inner LISP GPE layer for IPv6-in-IPv6.""" - name = "Lisp GPE Inner Layer - IPv6" + + name = u"Lisp GPE Inner Layer - IPv6" class LispGPEInnerEther(Ether): """Scapy inner LISP GPE layer for Lisp-L2.""" - name = "Lisp GPE Inner Layer - Ethernet" + + name = u"Lisp GPE Inner Layer - Ethernet" class LispGPEInnerNSH(Packet): @@ -75,7 +84,7 @@ class LispGPEInnerNSH(Packet): def valid_ipv4(ip): try: - ipaddress.IPv4Address(unicode(ip)) + ipaddress.IPv4Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -83,7 +92,7 @@ def valid_ipv4(ip): def valid_ipv6(ip): try: - ipaddress.IPv6Address(unicode(ip)) + ipaddress.IPv6Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -95,25 +104,28 @@ def main(): :raises RuntimeError: If the received packet is not correct.""" args = TrafficScriptArg( - ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', - 'dut_if2_mac', 'src_rloc', 'dst_rloc'], - ['ot_mode']) - - tx_src_mac = args.get_arg('tg_src_mac') - tx_dst_mac = args.get_arg('dut_if1_mac') - rx_dst_mac = args.get_arg('tg_dst_mac') - rx_src_mac = args.get_arg('dut_if2_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - src_rloc = args.get_arg("src_rloc") - dst_rloc = args.get_arg("dst_rloc") - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - ot_mode = args.get_arg('ot_mode') + [ + u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac", + u"dut_if2_mac", u"src_rloc", u"dst_rloc" + ], + [u"ot_mode"] + ) + + tx_src_mac = args.get_arg(u"tg_src_mac") + tx_dst_mac = args.get_arg(u"dut_if1_mac") + rx_dst_mac = args.get_arg(u"tg_dst_mac") + rx_src_mac = args.get_arg(u"dut_if2_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + src_rloc = args.get_arg(u"src_rloc") + dst_rloc = args.get_arg(u"dst_rloc") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + ot_mode = args.get_arg(u"ot_mode") rxq = RxQueue(rx_if) txq = TxQueue(tx_if) - sent_packets = [] + pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) if valid_ipv4(src_ip) and valid_ipv4(dst_ip): @@ -125,10 +137,12 @@ def main(): pkt_raw /= ICMPv6EchoRequest() ip_format = IPv6 else: - raise ValueError("IP not in correct format") + raise ValueError(u"IP not in correct format") bind_layers(UDP, LispGPEHeader, dport=4341) + pkt_raw /= Raw() + sent_packets = list() sent_packets.append(pkt_raw) txq.send(pkt_raw) @@ -138,59 +152,59 @@ def main(): ether = rxq.recv(2) if ether is None: - raise RuntimeError("ICMP echo Rx timeout") + raise RuntimeError(u"ICMP echo Rx timeout") if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: - print("MAC addresses match.") + print(u"MAC addresses match.") else: - raise RuntimeError( - "Matching packet unsuccessful: {0}".format(ether.__repr__())) + raise RuntimeError(f"Matching packet unsuccessful: {ether!r}") ip = ether.payload - if ot_mode == '6to4': + if ot_mode == u"6to4": if not isinstance(ip, IP): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) - elif ot_mode == '4to6': + raise RuntimeError(f"Not an IP packet received {ip!r}") + elif ot_mode == u"4to6": if not isinstance(ip, IPv6): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") elif not isinstance(ip, ip_format): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") lisp = ether.getlayer(LispGPEHeader).underlayer if not lisp: - raise RuntimeError("Lisp layer not present or parsing failed.") + raise RuntimeError(u"Lisp layer not present or parsing failed.") # Compare data from packets if src_ip == lisp.src: - print("Source IP matches source EID.") + print(u"Source IP matches source EID.") else: - raise RuntimeError("Matching Src IP unsuccessful: {} != {}" - .format(src_ip, lisp.src)) + raise RuntimeError( + f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}" + ) if dst_ip == lisp.dst: - print("Destination IP matches destination EID.") + print(u"Destination IP matches destination EID.") else: - raise RuntimeError("Matching Dst IP unsuccessful: {} != {}" - .format(dst_ip, lisp.dst)) + raise RuntimeError( + f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}" + ) if src_rloc == ip.src: - print("Source RLOC matches configuration.") + print(u"Source RLOC matches configuration.") else: - raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}" - .format(src_rloc, ip.src)) + raise RuntimeError( + f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}" + ) if dst_rloc == ip.dst: - print("Destination RLOC matches configuration.") + print(u"Destination RLOC matches configuration.") else: - raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}" - .format(dst_rloc, ip.dst)) + raise RuntimeError( + f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}" + ) sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main() |