aboutsummaryrefslogtreecommitdiffstats
path: root/resources/traffic_scripts/send_ip_check_headers.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/traffic_scripts/send_ip_check_headers.py')
-rwxr-xr-xresources/traffic_scripts/send_ip_check_headers.py108
1 files changed, 60 insertions, 48 deletions
diff --git a/resources/traffic_scripts/send_ip_check_headers.py b/resources/traffic_scripts/send_ip_check_headers.py
index 7c4f2bd002..f5a55553cf 100755
--- a/resources/traffic_scripts/send_ip_check_headers.py
+++ b/resources/traffic_scripts/send_ip_check_headers.py
@@ -1,4 +1,5 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
+
# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,10 +21,12 @@ MAC addresses are checked in received packet.
import sys
import ipaddress
+
from robot.api import logger
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.layers.l2 import Ether, Dot1Q
+from scapy.packet import Raw
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -31,7 +34,7 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def valid_ipv4(ip):
try:
- ipaddress.IPv4Address(unicode(ip))
+ ipaddress.IPv4Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -39,7 +42,7 @@ def valid_ipv4(ip):
def valid_ipv6(ip):
try:
- ipaddress.IPv6Address(unicode(ip))
+ ipaddress.IPv6Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -48,38 +51,45 @@ def valid_ipv6(ip):
def main():
"""Send IP/IPv6 packet from one traffic generator interface to the other."""
args = TrafficScriptArg(
- ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
- 'dut_if2_mac'],
- ['encaps_tx', 'vlan_tx', 'vlan_outer_tx',
- 'encaps_rx', 'vlan_rx', 'vlan_outer_rx'])
-
- tx_src_mac = args.get_arg('tg_src_mac')
- tx_dst_mac = args.get_arg('dut_if1_mac')
- rx_dst_mac = args.get_arg('tg_dst_mac')
- rx_src_mac = args.get_arg('dut_if2_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
-
- encaps_tx = args.get_arg('encaps_tx')
- vlan_tx = args.get_arg('vlan_tx')
- vlan_outer_tx = args.get_arg('vlan_outer_tx')
- encaps_rx = args.get_arg('encaps_rx')
- vlan_rx = args.get_arg('vlan_rx')
- vlan_outer_rx = args.get_arg('vlan_outer_rx')
+ [
+ u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+ u"dut_if2_mac"
+ ],
+ [
+ u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx",
+ u"vlan_rx", u"vlan_outer_rx"
+ ]
+ )
+
+ tx_src_mac = args.get_arg(u"tg_src_mac")
+ tx_dst_mac = args.get_arg(u"dut_if1_mac")
+ rx_dst_mac = args.get_arg(u"tg_dst_mac")
+ rx_src_mac = args.get_arg(u"dut_if2_mac")
+ src_ip = args.get_arg(u"src_ip")
+ dst_ip = args.get_arg(u"dst_ip")
+ tx_if = args.get_arg(u"tx_if")
+ rx_if = args.get_arg(u"rx_if")
+
+ encaps_tx = args.get_arg(u"encaps_tx")
+ vlan_tx = args.get_arg(u"vlan_tx")
+ vlan_outer_tx = args.get_arg(u"vlan_outer_tx")
+ encaps_rx = args.get_arg(u"encaps_rx")
+ vlan_rx = args.get_arg(u"vlan_rx")
+ vlan_outer_rx = args.get_arg(u"vlan_outer_rx")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
- sent_packets = []
- ip_format = ''
+
+ sent_packets =list()
pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
- if encaps_tx == 'Dot1q':
+
+ if encaps_tx == u"Dot1q":
pkt_raw /= Dot1Q(vlan=int(vlan_tx))
- elif encaps_tx == 'Dot1ad':
+ elif encaps_tx == u"Dot1ad":
pkt_raw.type = 0x88a8
pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
pkt_raw /= Dot1Q(vlan=vlan_tx)
+
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
ip_format = IP
@@ -87,8 +97,9 @@ def main():
pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
ip_format = IPv6
else:
- raise ValueError("IP not in correct format")
+ raise ValueError(u"IP not in correct format")
+ pkt_raw /= Raw()
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
@@ -99,7 +110,7 @@ def main():
ether = rxq.recv(2)
if ether is None:
- raise RuntimeError('IP packet Rx timeout')
+ raise RuntimeError(u"IP packet Rx timeout")
if ether.haslayer(ICMPv6ND_NS):
# read another packet in the queue if the current one is ICMPv6ND_NS
@@ -109,44 +120,45 @@ def main():
break
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
- logger.trace("MAC matched")
+ logger.trace(u"MAC matched")
else:
- raise RuntimeError("Matching packet unsuccessful: {0}".
- format(ether.__repr__()))
+ raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
- if encaps_rx == 'Dot1q':
+ if encaps_rx == u"Dot1q":
if ether[Dot1Q].vlan == int(vlan_rx):
- logger.trace("VLAN matched")
+ logger.trace(u"VLAN matched")
else:
- raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
- 'received, {}-expected):\n{}'.
- format(ether[Dot1Q].vlan, vlan_rx,
- ether.__repr__()))
+ raise RuntimeError(
+ f"Ethernet frame with wrong VLAN tag "
+ f"({ether[Dot1Q].vlan}-received, "
+ f"{vlan_rx}-expected):\n{ether!r}"
+ )
ip = ether[Dot1Q].payload
- elif encaps_rx == 'Dot1ad':
+ elif encaps_rx == u"Dot1ad":
raise NotImplementedError()
else:
ip = ether.payload
if not isinstance(ip, ip_format):
- raise RuntimeError("Not an IP packet received {0}".
- format(ip.__repr__()))
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
# Compare data from packets
if src_ip == ip.src:
- logger.trace("Src IP matched")
+ logger.trace(u"Src IP matched")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
- format(src_ip, ip.src))
+ raise RuntimeError(
+ f"Matching Src IP unsuccessful: {src_ip} != {ip.src}"
+ )
if dst_ip == ip.dst:
- logger.trace("Dst IP matched")
+ logger.trace(u"Dst IP matched")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
- format(dst_ip, ip.dst))
+ raise RuntimeError(
+ f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}"
+ )
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()