aboutsummaryrefslogtreecommitdiffstats
path: root/resources/traffic_scripts/lisp/lispgpe_check.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/traffic_scripts/lisp/lispgpe_check.py')
-rwxr-xr-xresources/traffic_scripts/lisp/lispgpe_check.py128
1 files changed, 71 insertions, 57 deletions
diff --git a/resources/traffic_scripts/lisp/lispgpe_check.py b/resources/traffic_scripts/lisp/lispgpe_check.py
index d4de8635d7..50857c4fe8 100755
--- a/resources/traffic_scripts/lisp/lispgpe_check.py
+++ b/resources/traffic_scripts/lisp/lispgpe_check.py
@@ -1,5 +1,6 @@
-#!/usr/bin/env python
-# Copyright (c) 2017 Cisco and/or its affiliates.
+#!/usr/bin/env python3
+
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -20,12 +21,13 @@ packet.
import sys
import ipaddress
+from scapy.all import bind_layers, Packet
+from scapy.fields import FlagsField, BitField, XBitField, IntField
from scapy.layers.inet import ICMP, IP, UDP
from scapy.layers.inet6 import ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
-from scapy.all import bind_layers, Packet
-from scapy.fields import FlagsField, BitField, XBitField, IntField
+from scapy.packet import Raw
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -33,13 +35,17 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
class LispGPEHeader(Packet):
"""Scapy header for the Lisp GPE Layer."""
+
name = "Lisp GPE Header"
fields_desc = [
- FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]),
- BitField("version", 0, size=2),
- BitField("reserved", 0, size=14),
- XBitField("next_protocol", 0, size=8),
- IntField("instance_id/locator_status_bits", 0)]
+ FlagsField(
+ u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
+ ),
+ BitField(u"version", 0, size=2),
+ BitField(u"reserved", 0, size=14),
+ XBitField(u"next_protocol", 0, size=8),
+ IntField(u"instance_id/locator_status_bits", 0)
+ ]
def guess_payload_class(self, payload):
protocol = {
@@ -53,17 +59,20 @@ class LispGPEHeader(Packet):
class LispGPEInnerIP(IP):
"""Scapy inner LISP GPE layer for IPv4-in-IPv4."""
- name = "Lisp GPE Inner Layer - IPv4"
+
+ name = u"Lisp GPE Inner Layer - IPv4"
class LispGPEInnerIPv6(IPv6):
"""Scapy inner LISP GPE layer for IPv6-in-IPv6."""
- name = "Lisp GPE Inner Layer - IPv6"
+
+ name = u"Lisp GPE Inner Layer - IPv6"
class LispGPEInnerEther(Ether):
"""Scapy inner LISP GPE layer for Lisp-L2."""
- name = "Lisp GPE Inner Layer - Ethernet"
+
+ name = u"Lisp GPE Inner Layer - Ethernet"
class LispGPEInnerNSH(Packet):
@@ -75,7 +84,7 @@ class LispGPEInnerNSH(Packet):
def valid_ipv4(ip):
try:
- ipaddress.IPv4Address(unicode(ip))
+ ipaddress.IPv4Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -83,7 +92,7 @@ def valid_ipv4(ip):
def valid_ipv6(ip):
try:
- ipaddress.IPv6Address(unicode(ip))
+ ipaddress.IPv6Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -95,25 +104,28 @@ def main():
:raises RuntimeError: If the received packet is not correct."""
args = TrafficScriptArg(
- ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
- 'dut_if2_mac', 'src_rloc', 'dst_rloc'],
- ['ot_mode'])
-
- tx_src_mac = args.get_arg('tg_src_mac')
- tx_dst_mac = args.get_arg('dut_if1_mac')
- rx_dst_mac = args.get_arg('tg_dst_mac')
- rx_src_mac = args.get_arg('dut_if2_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- src_rloc = args.get_arg("src_rloc")
- dst_rloc = args.get_arg("dst_rloc")
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- ot_mode = args.get_arg('ot_mode')
+ [
+ u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+ u"dut_if2_mac", u"src_rloc", u"dst_rloc"
+ ],
+ [u"ot_mode"]
+ )
+
+ tx_src_mac = args.get_arg(u"tg_src_mac")
+ tx_dst_mac = args.get_arg(u"dut_if1_mac")
+ rx_dst_mac = args.get_arg(u"tg_dst_mac")
+ rx_src_mac = args.get_arg(u"dut_if2_mac")
+ src_ip = args.get_arg(u"src_ip")
+ dst_ip = args.get_arg(u"dst_ip")
+ src_rloc = args.get_arg(u"src_rloc")
+ dst_rloc = args.get_arg(u"dst_rloc")
+ tx_if = args.get_arg(u"tx_if")
+ rx_if = args.get_arg(u"rx_if")
+ ot_mode = args.get_arg(u"ot_mode")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
- sent_packets = []
+
pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
@@ -125,10 +137,12 @@ def main():
pkt_raw /= ICMPv6EchoRequest()
ip_format = IPv6
else:
- raise ValueError("IP not in correct format")
+ raise ValueError(u"IP not in correct format")
bind_layers(UDP, LispGPEHeader, dport=4341)
+ pkt_raw /= Raw()
+ sent_packets = list()
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
@@ -138,59 +152,59 @@ def main():
ether = rxq.recv(2)
if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
+ raise RuntimeError(u"ICMP echo Rx timeout")
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
- print("MAC addresses match.")
+ print(u"MAC addresses match.")
else:
- raise RuntimeError(
- "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+ raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
ip = ether.payload
- if ot_mode == '6to4':
+ if ot_mode == u"6to4":
if not isinstance(ip, IP):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
- elif ot_mode == '4to6':
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
+ elif ot_mode == u"4to6":
if not isinstance(ip, IPv6):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
elif not isinstance(ip, ip_format):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
lisp = ether.getlayer(LispGPEHeader).underlayer
if not lisp:
- raise RuntimeError("Lisp layer not present or parsing failed.")
+ raise RuntimeError(u"Lisp layer not present or parsing failed.")
# Compare data from packets
if src_ip == lisp.src:
- print("Source IP matches source EID.")
+ print(u"Source IP matches source EID.")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
- .format(src_ip, lisp.src))
+ raise RuntimeError(
+ f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
+ )
if dst_ip == lisp.dst:
- print("Destination IP matches destination EID.")
+ print(u"Destination IP matches destination EID.")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
- .format(dst_ip, lisp.dst))
+ raise RuntimeError(
+ f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
+ )
if src_rloc == ip.src:
- print("Source RLOC matches configuration.")
+ print(u"Source RLOC matches configuration.")
else:
- raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
- .format(src_rloc, ip.src))
+ raise RuntimeError(
+ f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
+ )
if dst_rloc == ip.dst:
- print("Destination RLOC matches configuration.")
+ print(u"Destination RLOC matches configuration.")
else:
- raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
- .format(dst_rloc, ip.dst))
+ raise RuntimeError(
+ f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
+ )
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()