aboutsummaryrefslogtreecommitdiffstats
path: root/resources
diff options
context:
space:
mode:
Diffstat (limited to 'resources')
-rwxr-xr-xresources/traffic_scripts/check_ra_packet.py46
-rwxr-xr-xresources/traffic_scripts/icmpv6_echo.py64
-rwxr-xr-xresources/traffic_scripts/icmpv6_echo_req_resp.py119
-rwxr-xr-xresources/traffic_scripts/ipfix_check.py15
-rwxr-xr-xresources/traffic_scripts/ipfix_sessions.py7
-rwxr-xr-xresources/traffic_scripts/ipsec.py51
-rwxr-xr-xresources/traffic_scripts/ipv6_nd_proxy_check.py89
-rwxr-xr-xresources/traffic_scripts/ipv6_ns.py67
-rwxr-xr-xresources/traffic_scripts/ipv6_sweep_ping.py57
-rwxr-xr-xresources/traffic_scripts/policer.py58
-rwxr-xr-xresources/traffic_scripts/send_icmp_check_headers.py45
-rwxr-xr-xresources/traffic_scripts/send_icmp_check_multipath.py33
-rwxr-xr-xresources/traffic_scripts/send_icmp_type_code.py35
-rwxr-xr-xresources/traffic_scripts/send_icmp_wait_for_reply.py34
-rwxr-xr-xresources/traffic_scripts/send_ip_icmp.py57
-rwxr-xr-xresources/traffic_scripts/send_rs_check_ra.py52
-rwxr-xr-xresources/traffic_scripts/send_tcp_for_classifier_test.py37
-rwxr-xr-xresources/traffic_scripts/send_tcp_udp.py34
-rwxr-xr-xresources/traffic_scripts/send_vxlan_for_proxy_test.py42
-rwxr-xr-xresources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py43
-rwxr-xr-xresources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py43
-rwxr-xr-xresources/traffic_scripts/span_check.py27
22 files changed, 631 insertions, 424 deletions
diff --git a/resources/traffic_scripts/check_ra_packet.py b/resources/traffic_scripts/check_ra_packet.py
index 231e07da11..9717c7db95 100755
--- a/resources/traffic_scripts/check_ra_packet.py
+++ b/resources/traffic_scripts/check_ra_packet.py
@@ -17,6 +17,8 @@
import sys
import ipaddress
+from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_NS
+
from resources.libraries.python.PacketVerifier import RxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -56,15 +58,23 @@ def main():
interval = int(args.get_arg('interval'))
rxq = RxQueue(rx_if)
- ether = rxq.recv(max(5, interval))
+ # receive ICMPv6ND_RA packet
+ while True:
+ ether = rxq.recv(max(5, interval))
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layer RA and check other values
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
- if not ether.haslayer('ICMPv6ND_RA'):
- raise RuntimeError('Not an RA packet received {0}'
- .format(ether.__repr__()))
+ # Check if received packet contains layer RA and check other values
+ if not ether.haslayer(ICMPv6ND_RA):
+ raise RuntimeError('Not an RA packet received {0}'.
+ format(ether.__repr__()))
src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
@@ -72,22 +82,22 @@ def main():
all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1')
if src_address != link_local:
- raise RuntimeError(
- 'Source address ({0}) not matching link local address({1})'.format(
- src_address, link_local))
+ raise RuntimeError('Source address ({0}) not matching link local '
+ 'address ({1})'.format(src_address, link_local))
if dst_address != all_nodes_multicast:
- raise RuntimeError('Packet destination address ({0}) is not the all '
- 'nodes multicast address ({1}).'.format(
- dst_address, all_nodes_multicast))
- if ether['IPv6'].hlim != 255:
- raise RuntimeError('Hop limit not correct: {0}!=255'.format(
- ether['IPv6'].hlim))
-
- ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code
+ raise RuntimeError('Packet destination address ({0}) is not the all'
+ ' nodes multicast address ({1}).'.
+ format(dst_address, all_nodes_multicast))
+ if ether[IPv6].hlim != 255:
+ raise RuntimeError('Hop limit not correct: {0}!=255'.
+ format(ether[IPv6].hlim))
+
+ ra_code = ether[ICMPv6ND_RA].code
if ra_code != 0:
raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/icmpv6_echo.py b/resources/traffic_scripts/icmpv6_echo.py
index 4bf573a1f1..a18896b07f 100755
--- a/resources/traffic_scripts/icmpv6_echo.py
+++ b/resources/traffic_scripts/icmpv6_echo.py
@@ -17,13 +17,19 @@
import sys
import logging
+
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
from scapy.all import Ether
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def main():
@@ -43,52 +49,60 @@ def main():
# send ICMPv6 neighbor advertisement message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=src_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=src_mac))
+ IPv6(src=src_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NA(tgt=src_ip, R=0) /
+ ICMPv6NDOptDstLLAddr(lladdr=src_mac))
sent_packets.append(pkt_send)
txq.send(pkt_send)
# send ICMPv6 echo request
pkt_send = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
+ IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
sent_packets.append(pkt_send)
txq.send(pkt_send)
# receive ICMPv6 echo reply
- ether = rxq.recv(2, sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ while True:
+ ether = rxq.recv(2, sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
+ format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no ICMPv6 echo reply '
+ 'received {0}'.format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Reply']
+ icmpv6 = ipv6[ICMPv6EchoReply]
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
- 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
+ raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} seq {1} '
+ 'should be ID {2} seq {3}'.
+ format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/icmpv6_echo_req_resp.py b/resources/traffic_scripts/icmpv6_echo_req_resp.py
index c2cc4d20a0..ec9cf94a67 100755
--- a/resources/traffic_scripts/icmpv6_echo_req_resp.py
+++ b/resources/traffic_scripts/icmpv6_echo_req_resp.py
@@ -18,14 +18,20 @@
import sys
import logging
+
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
+
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.all import Ether
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
+
def main():
args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
@@ -52,109 +58,122 @@ def main():
# send ICMPv6 neighbor advertisement message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=src_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=src_mac))
+ IPv6(src=src_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NA(tgt=src_ip, R=0) /
+ ICMPv6NDOptDstLLAddr(lladdr=src_mac))
src_sent_packets.append(pkt_send)
src_txq.send(pkt_send)
pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=dst_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
+ IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NA(tgt=dst_ip, R=0) /
+ ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
dst_sent_packets.append(pkt_send)
dst_txq.send(pkt_send)
# send ICMPv6 echo request from first TG interface
pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
- ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
+ IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
+ ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
src_sent_packets.append(pkt_send)
src_txq.send(pkt_send)
# receive ICMPv6 echo request on second TG interface
- ether = dst_rxq.recv(2, dst_sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ while True:
+ ether = dst_rxq.recv(2, dst_sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 received: {0}'.
+ format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
# verify hop limit processing
if ipv6.hlim != (hop_limit - hop_num):
- raise RuntimeError(
- 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
- hop_limit - hop_num))
+ raise RuntimeError('Invalid hop limit {0} should be {1}'.
+ format(ipv6.hlim,hop_limit - hop_num))
if not ipv6.haslayer(ICMPv6EchoRequest):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
+ format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Request']
+ icmpv6 = ipv6[ICMPv6EchoRequest]
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
- 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
+ raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
+ 'seq {1} should be ID {2} seq {3}'.
+ format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoRequest(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
# send ICMPv6 echo reply from second TG interface
pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
- IPv6(src=dst_ip, dst=src_ip) /
- ICMPv6EchoReply(id=echo_id, seq=echo_seq))
+ IPv6(src=dst_ip, dst=src_ip) /
+ ICMPv6EchoReply(id=echo_id, seq=echo_seq))
dst_sent_packets.append(pkt_send)
dst_txq.send(pkt_send)
# receive ICMPv6 echo reply on first TG interface
- ether = src_rxq.recv(2, src_sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ while True:
+ ether = src_rxq.recv(2, src_sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'.
+ format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
# verify hop limit processing
if ipv6.hlim != (hop_limit - hop_num):
- raise RuntimeError(
- 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
- hop_limit - hop_num))
+ raise RuntimeError('Invalid hop limit {0} should be {1}'.
+ format(ipv6.hlim, hop_limit - hop_num))
if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
+ format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Reply']
+ icmpv6 = ipv6[ICMPv6EchoReply]
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
- 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
+ raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
+ 'seq {1} should be ID {2} seq {3}'.
+ format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipfix_check.py b/resources/traffic_scripts/ipfix_check.py
index 2a08f0ce85..f5693cc7e8 100755
--- a/resources/traffic_scripts/ipfix_check.py
+++ b/resources/traffic_scripts/ipfix_check.py
@@ -24,8 +24,8 @@ from scapy.layers.l2 import Ether
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler, \
- IPFIXData
+from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler
+from resources.libraries.python.telemetry.IPFIXUtil import IPFIXData
def valid_ipv4(ip):
@@ -123,6 +123,11 @@ def main():
pkt = rxq.recv(10, ignore=ignore, verbose=verbose)
if pkt is None:
raise RuntimeError("RX timeout")
+
+ if pkt.haslayer("ICMPv6ND_NS"):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+
if pkt.haslayer("IPFIXHeader"):
if pkt.haslayer("IPFIXTemplate"):
# create or update template for IPFIX data packets
@@ -138,9 +143,9 @@ def main():
# verify packet count
if data["packetTotalCount"] != count:
- raise RuntimeError(
- "IPFIX reported wrong packet count. Count was {0},"
- " but should be {1}".format(data["packetTotalCount"], count))
+ raise RuntimeError("IPFIX reported wrong packet count. Count was {0},"
+ "but should be {1}".format(data["packetTotalCount"],
+ count))
# verify IP addresses
keys = data.keys()
err = "{0} mismatch. Packets used {1}, but were classified as {2}."
diff --git a/resources/traffic_scripts/ipfix_sessions.py b/resources/traffic_scripts/ipfix_sessions.py
index e7597a894a..11e77fa08c 100755
--- a/resources/traffic_scripts/ipfix_sessions.py
+++ b/resources/traffic_scripts/ipfix_sessions.py
@@ -192,6 +192,11 @@ def main():
pkt = rxq.recv(5)
if pkt is None:
raise RuntimeError("RX timeout")
+
+ if pkt.haslayer("ICMPv6ND_NS"):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+
if pkt.haslayer("IPFIXHeader"):
if pkt.haslayer("IPFIXTemplate"):
# create or update template for IPFIX data packets
@@ -219,6 +224,6 @@ def main():
raise RuntimeError("Received non-IPFIX packet or IPFIX header was"
"not recognized.")
-if __name__ == "__main__":
+if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipsec.py b/resources/traffic_scripts/ipsec.py
index 13d44b8a51..1561738c60 100755
--- a/resources/traffic_scripts/ipsec.py
+++ b/resources/traffic_scripts/ipsec.py
@@ -18,8 +18,14 @@
import sys
import logging
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from scapy.all import Ether, IP, ICMP, IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+
+from scapy.all import Ether
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.layers.ipsec import SecurityAssociation, ESP
from ipaddress import ip_address
@@ -39,7 +45,7 @@ def check_ipv4(pkt_recv, dst_tun, src_ip, dst_ip, sa_in):
:type dst_tun: str
:type src_ip: str
:type dst_ip: str
- :type sa_sa: scapy.layers.ipsec.SecurityAssociation
+ :type sa_in: scapy.layers.ipsec.SecurityAssociation
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
@@ -55,15 +61,15 @@ def check_ipv4(pkt_recv, dst_tun, src_ip, dst_ip, sa_in):
raise RuntimeError(
'Not an ESP packet received: {0}'.format(pkt_recv.__repr__()))
- ip_pkt = pkt_recv['IP']
+ ip_pkt = pkt_recv[IP]
d_pkt = sa_in.decrypt(ip_pkt)
- if d_pkt['IP'].dst != dst_ip:
+ if d_pkt[IP].dst != dst_ip:
raise RuntimeError(
'Decrypted packet has invalid destination address: {0} '
'should be: {1}'.format(d_pkt['IP'].dst, dst_ip))
- if d_pkt['IP'].src != src_ip:
+ if d_pkt[IP].src != src_ip:
raise RuntimeError(
'Decrypted packet has invalid source address: {0} should be: {1}'
.format(d_pkt['IP'].src, src_ip))
@@ -93,7 +99,7 @@ def check_ipv6(pkt_recv, dst_tun, src_ip, dst_ip, sa_in):
raise RuntimeError(
'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__()))
- if pkt_recv['IPv6'].dst != dst_tun:
+ if pkt_recv[IPv6].dst != dst_tun:
raise RuntimeError(
'Received packet has invalid destination address: {0} '
'should be: {1}'.format(pkt_recv['IPv6'].dst, dst_tun))
@@ -102,15 +108,15 @@ def check_ipv6(pkt_recv, dst_tun, src_ip, dst_ip, sa_in):
raise RuntimeError(
'Not an ESP packet received: {0}'.format(pkt_recv.__repr__()))
- ip_pkt = pkt_recv['IPv6']
+ ip_pkt = pkt_recv[IPv6]
d_pkt = sa_in.decrypt(ip_pkt)
- if d_pkt['IPv6'].dst != dst_ip:
+ if d_pkt[IPv6].dst != dst_ip:
raise RuntimeError(
'Decrypted packet has invalid destination address {0}: '
'should be: {1}'.format(d_pkt['IPv6'].dst, dst_ip))
- if d_pkt['IPv6'].src != src_ip:
+ if d_pkt[IPv6].src != src_ip:
raise RuntimeError(
'Decrypted packet has invalid source address: {0} should be: {1}'
.format(d_pkt['IPv6'].src, src_ip))
@@ -175,25 +181,33 @@ def main():
sent_packets = []
if is_ipv4:
- ip_pkt = IP(src=src_ip, dst=dst_ip) / \
- ICMP()
+ ip_pkt = (IP(src=src_ip, dst=dst_ip) /
+ ICMP())
ip_pkt = IP(str(ip_pkt))
else:
- ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \
- ICMPv6EchoRequest()
+ ip_pkt = (IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest())
ip_pkt = IPv6(str(ip_pkt))
e_pkt = sa_out.encrypt(ip_pkt)
- pkt_send = Ether(src=src_mac, dst=dst_mac) / \
- e_pkt
+ pkt_send = (Ether(src=src_mac, dst=dst_mac) /
+ e_pkt)
sent_packets.append(pkt_send)
txq.send(pkt_send)
- pkt_recv = rxq.recv(2, sent_packets)
+ while True:
+ pkt_recv = rxq.recv(2, sent_packets)
- if pkt_recv is None:
- raise RuntimeError('ESP packet Rx timeout')
+ if pkt_recv is None:
+ raise RuntimeError('ESP packet Rx timeout')
+
+ if pkt_recv.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if is_ipv4:
check_ipv4(pkt_recv, src_tun, dst_ip, src_ip, sa_in)
@@ -202,5 +216,6 @@ def main():
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py
index b1028028b0..c9213999ec 100755
--- a/resources/traffic_scripts/ipv6_nd_proxy_check.py
+++ b/resources/traffic_scripts/ipv6_nd_proxy_check.py
@@ -15,8 +15,9 @@
"""Traffic script that sends DHCPv6 proxy packets."""
from scapy.layers.inet import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
- ICMPv6ND_NS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -44,17 +45,23 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
sent_packets = []
- icmpv6nd_solicit_pkt =\
- Ether(src=src_mac, dst=dst_mac) / \
- IPv6(src=src_ip) / \
- ICMPv6ND_NS(tgt=dst_ip)
+ icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) /
+ IPv6(src=src_ip) /
+ ICMPv6ND_NS(tgt=dst_ip))
sent_packets.append(icmpv6nd_solicit_pkt)
txq.send(icmpv6nd_solicit_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3, ignore=sent_packets)
+ while True:
+ pkt = rxq.recv(3, ignore=sent_packets)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
@@ -63,34 +70,33 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
raise RuntimeError('ICMPv6ND Proxy response timeout.')
if ether.src != dst_mac:
- raise RuntimeError("Source MAC address error: {} != {}".format(
- ether.src, dst_mac))
+ raise RuntimeError("Source MAC address error: {} != {}".
+ format(ether.src, dst_mac))
print "Source MAC address: OK."
if ether.dst != src_mac:
- raise RuntimeError("Destination MAC address error: {} != {}".format(
- ether.dst, src_mac))
+ raise RuntimeError("Destination MAC address error: {} != {}".
+ format(ether.dst, src_mac))
print "Destination MAC address: OK."
- if ether['IPv6'].src != dst_ip:
- raise RuntimeError("Source IP address error: {} != {}".format(
- ether['IPv6'].src, dst_ip))
+ if ether[IPv6].src != dst_ip:
+ raise RuntimeError("Source IP address error: {} != {}".
+ format(ether[IPv6].src, dst_ip))
print "Source IP address: OK."
- if ether['IPv6'].dst != src_ip:
- raise RuntimeError("Destination IP address error: {} != {}".format(
- ether['IPv6'].dst, src_ip))
+ if ether[IPv6].dst != src_ip:
+ raise RuntimeError("Destination IP address error: {} != {}".
+ format(ether[IPv6].dst, src_ip))
print "Destination IP address: OK."
try:
- target_addr = ether['IPv6']\
- ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt
+ target_addr = ether[IPv6][ICMPv6ND_NA].tgt
except (KeyError, AttributeError):
raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
if target_addr != dst_ip:
- raise RuntimeError("ICMPv6 field 'Target address' error:"
- " {} != {}".format(target_addr, dst_ip))
+ raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}".
+ format(target_addr, dst_ip))
print "Target address field: OK."
@@ -119,16 +125,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
rxq = RxQueue(dst_if)
txq = TxQueue(src_if)
- icmpv6_ping_pkt = \
- Ether(src=src_mac, dst=proxy_to_src_mac) / \
- IPv6(src=src_ip, dst=dst_ip) / \
- ICMPv6EchoRequest()
+ icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) /
+ IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest())
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3)
+ while True:
+ pkt = rxq.recv(3)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
@@ -136,7 +148,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
if ether is None:
raise RuntimeError('ICMPv6 Echo Request timeout.')
try:
- ether["IPv6"]["ICMPv6 Echo Request"]
+ ether[IPv6]["ICMPv6 Echo Request"]
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
print "ICMP Echo: OK."
@@ -144,16 +156,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
rxq = RxQueue(src_if)
txq = TxQueue(dst_if)
- icmpv6_ping_pkt = \
- Ether(src=dst_mac, dst=proxy_to_dst_mac) / \
- IPv6(src=dst_ip, dst=src_ip) / \
- ICMPv6EchoReply()
+ icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) /
+ IPv6(src=dst_ip, dst=src_ip) /
+ ICMPv6EchoReply())
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3)
+ while True:
+ pkt = rxq.recv(3)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
@@ -161,7 +179,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
if ether is None:
raise RuntimeError('DHCPv6 SOLICIT timeout')
try:
- ether["IPv6"]["ICMPv6 Echo Reply"]
+ ether[IPv6]["ICMPv6 Echo Reply"]
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
@@ -187,8 +205,9 @@ def main():
imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
# Verify route (ICMP echo/reply)
- ipv6_ping(src_if, dst_if, src_mac, dst_mac,
- proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip)
+ ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac,
+ proxy_to_dst_mac, src_ip, dst_ip)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipv6_ns.py b/resources/traffic_scripts/ipv6_ns.py
index 70c6ab445a..5852e6317f 100755
--- a/resources/traffic_scripts/ipv6_ns.py
+++ b/resources/traffic_scripts/ipv6_ns.py
@@ -17,13 +17,18 @@
import sys
import logging
+
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
+
+from scapy.all import Ether
from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr
-from scapy.all import Ether
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def main():
@@ -41,57 +46,67 @@ def main():
# send ICMPv6 neighbor solicitation message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NS(tgt=dst_ip) /
- ICMPv6NDOptSrcLLAddr(lladdr=src_mac))
+ IPv6(src=src_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NS(tgt=dst_ip) /
+ ICMPv6NDOptSrcLLAddr(lladdr=src_mac))
sent_packets.append(pkt_send)
txq.send(pkt_send)
# receive ICMPv6 neighbor advertisement message
- ether = rxq.recv(2, sent_packets)
+ while True:
+ ether = rxq.recv(2, sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
if ether is None:
raise RuntimeError('ICMPv6 echo reply Rx timeout')
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
+ format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
if not ipv6.haslayer(ICMPv6ND_NA):
- raise RuntimeError(
- 'Unexpected packet with no ICMPv6 ND-NA received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received '
+ '{0}'.format(ipv6.__repr__()))
- icmpv6_na = ipv6['ICMPv6 Neighbor Discovery - Neighbor Advertisement']
+ icmpv6_na = ipv6[ICMPv6ND_NA]
# verify target address
if icmpv6_na.tgt != dst_ip:
- raise RuntimeError('Invalid target address {0} should be {1}'.format(
- icmpv6_na.tgt, dst_ip))
+ raise RuntimeError('Invalid target address {0} should be {1}'.
+ format(icmpv6_na.tgt, dst_ip))
if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr):
- raise RuntimeError(
- 'Missing Destination Link-Layer Address option in ICMPv6 ' +
- 'Neighbor Advertisement {0}'.format(icmpv6_na.__repr__()))
+ raise RuntimeError('Missing Destination Link-Layer Address option in '
+ 'ICMPv6 Neighbor Advertisement {0}'.
+ format(icmpv6_na.__repr__()))
- option = 'ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address'
- dst_ll_addr = icmpv6_na[option]
+ dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr]
# verify destination link-layer address field
if dst_ll_addr.lladdr != dst_mac:
- raise RuntimeError('Invalid lladdr {0} should be {1}'.format(
- dst_ll_addr.lladdr, dst_mac))
+ raise RuntimeError('Invalid lladdr {0} should be {1}'.
+ format(dst_ll_addr.lladdr, dst_mac))
# verify checksum
cksum = icmpv6_na.cksum
del icmpv6_na.cksum
tmp = ICMPv6ND_NA(str(icmpv6_na))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipv6_sweep_ping.py b/resources/traffic_scripts/ipv6_sweep_ping.py
index 80fdda532a..28d23a16ef 100755
--- a/resources/traffic_scripts/ipv6_sweep_ping.py
+++ b/resources/traffic_scripts/ipv6_sweep_ping.py
@@ -19,13 +19,18 @@ import logging
import os
import sys
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, \
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
from scapy.all import Ether
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def main():
@@ -68,37 +73,43 @@ def main():
sent_packets.append(pkt_send)
txq.send(pkt_send)
- ether = rxq.recv(ignore=sent_packets)
- if ether is None:
- raise RuntimeError(
- 'ICMPv6 echo reply seq {0} Rx timeout'.format(echo_seq))
+ while True:
+ ether = rxq.recv(ignore=sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply seq {0} '
+ 'Rx timeout'.format(echo_seq))
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 layer '
+ 'received: {0}'.format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no ICMPv6 echo reply '
+ 'layer received: {0}'.format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Reply']
+ icmpv6 = ipv6[ICMPv6EchoReply]
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be '
- 'ID {2} seq {3}, {0}'.format(icmpv6.id, icmpv6.seq, echo_id,
- echo_seq))
+ raise RuntimeError('ICMPv6 echo reply with invalid data received: '
+ 'ID {0} seq {1} should be ID {2} seq {3}, {0}'.
+ format(icmpv6.id, icmpv6.seq, echo_id,
+ echo_seq))
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum: {0} should be {1}'.
+ format(cksum, tmp.cksum))
sent_packets.remove(pkt_send)
diff --git a/resources/traffic_scripts/policer.py b/resources/traffic_scripts/policer.py
index ee9fa29c4a..62c397d496 100755
--- a/resources/traffic_scripts/policer.py
+++ b/resources/traffic_scripts/policer.py
@@ -21,7 +21,10 @@ import logging
# pylint: disable=no-name-in-module
# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from scapy.all import Ether, IP, IPv6, TCP
+
+from scapy.all import Ether
+from scapy.layers.inet import IP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from ipaddress import ip_address
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -38,17 +41,17 @@ def check_ipv4(pkt_recv, dscp):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
- raise RuntimeError(
- 'Not an IPv4 packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not an IPv4 packet received: {0}'.
+ format(pkt_recv.__repr__()))
- rx_dscp = pkt_recv['IP'].tos >> 2
+ rx_dscp = pkt_recv[IP].tos >> 2
if rx_dscp != dscp:
- raise RuntimeError(
- 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp))
+ raise RuntimeError('Invalid DSCP {0} should be {1}'.
+ format(rx_dscp, dscp))
if not pkt_recv.haslayer(TCP):
- raise RuntimeError(
- 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not a TCP packet received: {0}'.
+ format(pkt_recv.__repr__()))
def check_ipv6(pkt_recv, dscp):
@@ -61,17 +64,17 @@ def check_ipv6(pkt_recv, dscp):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IPv6):
- raise RuntimeError(
- 'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not an IPv6 packet received: {0}'.
+ format(pkt_recv.__repr__()))
- rx_dscp = pkt_recv['IPv6'].tc >> 2
+ rx_dscp = pkt_recv[IPv6].tc >> 2
if rx_dscp != dscp:
- raise RuntimeError(
- 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp))
+ raise RuntimeError('Invalid DSCP {0} should be {1}'.
+ format(rx_dscp, dscp))
if not pkt_recv.haslayer(TCP):
- raise RuntimeError(
- 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not a TCP packet received: {0}'.
+ format(pkt_recv.__repr__()))
# pylint: disable=too-many-locals
@@ -97,19 +100,29 @@ def main():
sent_packets = []
if is_ipv4:
- ip_pkt = IP(src=src_ip, dst=dst_ip) / \
- TCP()
+ ip_pkt = (IP(src=src_ip, dst=dst_ip) /
+ TCP())
else:
- ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \
- TCP()
+ ip_pkt = (IPv6(src=src_ip, dst=dst_ip) /
+ TCP())
- pkt_send = Ether(src=src_mac, dst=dst_mac) / \
- ip_pkt
+ pkt_send = (Ether(src=src_mac, dst=dst_mac) /
+ ip_pkt)
sent_packets.append(pkt_send)
txq.send(pkt_send)
- pkt_recv = rxq.recv(2, sent_packets)
+ while True:
+ pkt_recv = rxq.recv(2, sent_packets)
+ if pkt_recv is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if pkt_recv.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt_recv is None:
raise RuntimeError('Rx timeout')
@@ -121,5 +134,6 @@ def main():
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_icmp_check_headers.py b/resources/traffic_scripts/send_icmp_check_headers.py
index c9e3a35df4..2193164b61 100755
--- a/resources/traffic_scripts/send_icmp_check_headers.py
+++ b/resources/traffic_scripts/send_icmp_check_headers.py
@@ -22,8 +22,7 @@ import sys
import ipaddress
from robot.api import logger
from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from scapy.layers.l2 import Ether, Dot1Q
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
@@ -94,28 +93,36 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- if tx_if == rx_if:
- ether = rxq.recv(2, ignore=sent_packets)
- else:
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
+ while True:
+ if tx_if == rx_if:
+ ether = rxq.recv(2, ignore=sent_packets)
+ else:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
logger.trace("MAC matched")
else:
- raise RuntimeError(
- "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+ raise RuntimeError("Matching packet unsuccessful: {0}".
+ format(ether.__repr__()))
if encaps_rx == 'Dot1q':
if ether[Dot1Q].vlan == int(vlan_rx):
logger.trace("VLAN matched")
else:
raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
- 'received, {}-expected):\n{}'
- .format(ether[Dot1Q].vlan, vlan_rx,
- ether.__repr__()))
+ 'received, {}-expected):\n{}'.
+ format(ether[Dot1Q].vlan, vlan_rx,
+ ether.__repr__()))
ip = ether[Dot1Q].payload
elif encaps_rx == 'Dot1ad':
raise NotImplementedError()
@@ -123,21 +130,21 @@ def main():
ip = ether.payload
if not isinstance(ip, ip_format):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError("Not an IP packet received {0}".
+ format(ip.__repr__()))
# Compare data from packets
if src_ip == ip.src:
logger.trace("Src IP matched")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
- .format(src_ip, ip.src))
+ raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
+ format(src_ip, ip.src))
if dst_ip == ip.dst:
logger.trace("Dst IP matched")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
- .format(dst_ip, ip.dst))
+ raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
+ format(dst_ip, ip.dst))
sys.exit(0)
diff --git a/resources/traffic_scripts/send_icmp_check_multipath.py b/resources/traffic_scripts/send_icmp_check_multipath.py
index 4e39efdfcd..5da3b7b096 100755
--- a/resources/traffic_scripts/send_icmp_check_multipath.py
+++ b/resources/traffic_scripts/send_icmp_check_multipath.py
@@ -18,10 +18,9 @@ and check if it is divided into two paths."""
import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether
-from scapy.layers.inet6 import ICMPv6EchoRequest
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -94,27 +93,38 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
+
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if ether is None:
raise RuntimeError("ICMP echo Rx timeout")
if not ether.haslayer(ip_format):
- raise RuntimeError("Not an IP packet received {0}"
- .format(ether.__repr__()))
+ raise RuntimeError("Not an IP packet received {0}".
+ format(ether.__repr__()))
- if ether['Ethernet'].src != dut_if2_mac:
+ if ether[Ether].src != dut_if2_mac:
raise RuntimeError("Source MAC address error")
- if ether['Ethernet'].dst == path_1_mac:
+ if ether[Ether].dst == path_1_mac:
path_1_counter += 1
- elif ether['Ethernet'].dst == path_2_mac:
+ elif ether[Ether].dst == path_2_mac:
path_2_counter += 1
else:
raise RuntimeError("Destination MAC address error")
if (path_1_counter + path_2_counter) != 100:
- raise RuntimeError("Packet loss: recevied only {} packets of 100 "
- .format(path_1_counter + path_2_counter))
+ raise RuntimeError("Packet loss: recevied only {} packets of 100 ".
+ format(path_1_counter + path_2_counter))
if path_1_counter == 0:
raise RuntimeError("Path 1 error!")
@@ -127,5 +137,6 @@ def main():
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_icmp_type_code.py b/resources/traffic_scripts/send_icmp_type_code.py
index dffaafff38..2997f91264 100755
--- a/resources/traffic_scripts/send_icmp_type_code.py
+++ b/resources/traffic_scripts/send_icmp_type_code.py
@@ -19,10 +19,9 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -88,13 +87,13 @@ def main():
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IP(src=src_ip, dst=dst_ip) /
ICMP(code=icmp_code, type=icmp_type))
- ip_format = 'IP'
- icmp_format = 'ICMP'
+ ip_format = IP
+ icmp_format = ICMP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IPv6(src=src_ip, dst=dst_ip) /
ICMPv6EchoRequest(code=icmp_code, type=icmp_type))
- ip_format = 'IPv6'
+ ip_format = IPv6
icmp_format = 'ICMPv6'
else:
raise ValueError("IP(s) not in correct format")
@@ -103,23 +102,31 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layers Ether, IP and ICMP
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+ # Check whether received packet contains layers IP and ICMP
if not ether.haslayer(ip_format):
- raise RuntimeError('Not an IP packet received {0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an IP packet received {0}'.
+ format(ether.__repr__()))
# Cannot use haslayer for ICMPv6, every type of ICMPv6 is a separate layer
# Next header value of 58 means the next header is ICMPv6
if not ether.haslayer(icmp_format) and ether[ip_format].nh != 58:
- raise RuntimeError('Not an ICMP packet received {0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an ICMP packet received {0}'.
+ format(ether.__repr__()))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_icmp_wait_for_reply.py b/resources/traffic_scripts/send_icmp_wait_for_reply.py
index a77f15efc2..7677152801 100755
--- a/resources/traffic_scripts/send_icmp_wait_for_reply.py
+++ b/resources/traffic_scripts/send_icmp_wait_for_reply.py
@@ -17,9 +17,9 @@
import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.all import Ether
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -35,14 +35,14 @@ def is_icmp_reply(pkt, ipformat):
:type ipformat: dict
:rtype: bool
"""
- #pylint: disable=bare-except
+ # pylint: disable=bare-except
try:
if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \
ipformat['Type']:
return True
else:
return False
- except:
+ except: # pylint: disable=bare-except
return False
@@ -58,12 +58,12 @@ def address_check(request, reply, ipformat):
:type ipformat: dict
:rtype: bool
"""
- #pylint: disable=bare-except
+ # pylint: disable=bare-except
try:
r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst
r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src
return r_src and r_dst
- except:
+ except: # pylint: disable=bare-except
return False
@@ -139,12 +139,21 @@ def main():
txq.send(icmp_request)
for _ in range(1000):
- icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
- if icmp_reply is None:
- timeout -= wait_step
- if timeout < 0:
- raise RuntimeError("ICMP echo Rx timeout")
- elif is_icmp_reply(icmp_reply, ip_format):
+ while True:
+ icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
+ if icmp_reply is None:
+ timeout -= wait_step
+ if timeout < 0:
+ raise RuntimeError("ICMP echo Rx timeout")
+
+ elif icmp_reply.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
+ if is_icmp_reply(icmp_reply, ip_format):
if address_check(icmp_request, icmp_reply, ip_format):
break
else:
@@ -154,5 +163,6 @@ def main():
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_ip_icmp.py b/resources/traffic_scripts/send_ip_icmp.py
index b22b5d39a8..58f2e1e4d8 100755
--- a/resources/traffic_scripts/send_ip_icmp.py
+++ b/resources/traffic_scripts/send_ip_icmp.py
@@ -19,11 +19,9 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
import sys
import ipaddress
+from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import ICMP, IP
-from scapy.layers.l2 import Ether
-from scapy.layers.l2 import Dot1Q
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -107,8 +105,8 @@ def main():
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IP(src=src_ip, dst=dst_ip) /
ICMP())
- ip_format = 'IP'
- icmp_format = 'ICMP'
+ ip_format = IP
+ icmp_format = ICMP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if encaps == 'Dot1q':
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
@@ -125,8 +123,8 @@ def main():
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IPv6(src=src_ip, dst=dst_ip) /
ICMPv6EchoRequest())
- ip_format = 'IPv6'
- icmp_format = 'ICMPv6EchoRequest'
+ ip_format = IPv6
+ icmp_format = ICMPv6EchoRequest
else:
raise ValueError("IP(s) not in correct format")
@@ -134,44 +132,55 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
+ # Receive ICMP / ICMPv6 echo reply
+ while True:
+ ether = rxq.recv(2,)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layers Ether, IP and ICMP
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+ # Check whether received packet contains layers IP/IPv6 and
+ # ICMP/ICMPv6EchoRequest
if encaps_rx:
if encaps_rx == 'Dot1q':
if not vlan1_rx:
vlan1_rx = vlan1
if not ether.haslayer(Dot1Q):
- raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'.
+ format(ether.__repr__()))
elif ether[Dot1Q].vlan != int(vlan1_rx):
raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) '
- 'received ({} expected):\n{}'.format(
- ether[Dot1Q].vlan, vlan1_rx, ether.__repr__()))
+ 'received ({} expected):\n{}'.
+ format(ether[Dot1Q].vlan, vlan1_rx,
+ ether.__repr__()))
elif encaps_rx == 'Dot1ad':
if not vlan1_rx:
vlan1_rx = vlan1
if not vlan2_rx:
vlan2_rx = vlan2
# TODO
- raise RuntimeError('Encapsulation {0} not implemented yet.'
- .format(encaps_rx))
+ raise RuntimeError('Encapsulation {0} not implemented yet.'.
+ format(encaps_rx))
else:
- raise RuntimeError('Unsupported/unknown encapsulation expected: {0}'
- .format(encaps_rx))
+ raise RuntimeError('Unsupported encapsulation expected: {0}'.
+ format(encaps_rx))
if not ether.haslayer(ip_format):
- raise RuntimeError('Not an IP packet received:\n{0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an IP/IPv6 packet received:\n{0}'.
+ format(ether.__repr__()))
if not ether.haslayer(icmp_format):
- raise RuntimeError('Not an ICMP packet received:\n{0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an ICMP/ICMPv6EchoRequest packet received:\n'
+ '{0}'.format(ether.__repr__()))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_rs_check_ra.py b/resources/traffic_scripts/send_rs_check_ra.py
index 9ba1f55b52..c9ff46528a 100755
--- a/resources/traffic_scripts/send_rs_check_ra.py
+++ b/resources/traffic_scripts/send_rs_check_ra.py
@@ -18,7 +18,7 @@ import sys
import ipaddress
from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6ND_RS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_RS, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -72,24 +72,29 @@ def main():
sent_packets = [pkt_raw]
txq.send(pkt_raw)
- ether = rxq.recv(8, ignore=sent_packets)
+ while True:
+ ether = rxq.recv(2, sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layer RA and check other values
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+ # Check whether received packet contains layer RA and check other values
if ether.src != router_mac:
- raise RuntimeError(
- 'Packet source MAC ({0}) does not match '
- 'router MAC ({1}).'.format(ether.src, router_mac))
+ raise RuntimeError('Packet source MAC ({0}) does not match router MAC '
+ '({1}).'.format(ether.src, router_mac))
if ether.dst != src_mac:
- raise RuntimeError(
- 'Packet destination MAC ({0}) does not match '
- 'RS source MAC ({1}).'.format(ether.dst, src_mac))
+ raise RuntimeError('Packet destination MAC ({0}) does not match RS '
+ 'source MAC ({1}).'.format(ether.dst, src_mac))
- if not ether.haslayer('ICMPv6ND_RA'):
- raise RuntimeError('Not an RA packet received {0}'
- .format(ether.__repr__()))
+ if not ether.haslayer(ICMPv6ND_RA):
+ raise RuntimeError('Not an RA packet received {0}'.
+ format(ether.__repr__()))
src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
@@ -98,24 +103,25 @@ def main():
rs_src_address = ipaddress.IPv6Address(unicode(src_ip))
if src_address != router_link_local:
- raise RuntimeError(
- 'Packet source address ({0}) does not match '
- 'link local address({1})'.format(src_address, router_link_local))
+ raise RuntimeError('Packet source address ({0}) does not match link '
+ 'local address({1})'.
+ format(src_address, router_link_local))
if dst_address != rs_src_address:
- raise RuntimeError(
- 'Packet destination address ({0}) does not match '
- 'RS source address ({1}).'.format(dst_address, rs_src_address))
+ raise RuntimeError('Packet destination address ({0}) does not match '
+ 'RS source address ({1}).'.
+ format(dst_address, rs_src_address))
if ether['IPv6'].hlim != 255:
- raise RuntimeError('Hop limit not correct: {0}!=255'.format(
- ether['IPv6'].hlim))
+ raise RuntimeError('Hop limit not correct: {0}!=255'.
+ format(ether['IPv6'].hlim))
- ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code
+ ra_code = ether[ICMPv6ND_RA].code
if ra_code != 0:
raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_tcp_for_classifier_test.py b/resources/traffic_scripts/send_tcp_for_classifier_test.py
index 5d8d387767..5a6873ab4e 100755
--- a/resources/traffic_scripts/send_tcp_for_classifier_test.py
+++ b/resources/traffic_scripts/send_tcp_for_classifier_test.py
@@ -17,27 +17,24 @@ Traffic script that sends an TCP packet
from TG to DUT.
"""
import sys
-import time
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether, Packet, Raw
+from scapy.layers.inet import IP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send TCP packet from one traffic generator interface to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
@@ -54,10 +51,9 @@ def main():
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -66,8 +62,8 @@ def main():
raise ValueError("Invalid IP version!")
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
@@ -79,10 +75,17 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(timeout)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the NSH SFC loopback packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
diff --git a/resources/traffic_scripts/send_tcp_udp.py b/resources/traffic_scripts/send_tcp_udp.py
index 77f918213f..4cba73286a 100755
--- a/resources/traffic_scripts/send_tcp_udp.py
+++ b/resources/traffic_scripts/send_tcp_udp.py
@@ -19,9 +19,9 @@ from one interface to the other.
import sys
import ipaddress
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether
+from scapy.layers.inet import IP, UDP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -62,9 +62,8 @@ def valid_ipv6(ip):
def main():
"""Send TCP or UDP packet from one traffic generator interface to the other.
"""
- args = TrafficScriptArg(
- ['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
- 'source_port', 'destination_port'])
+ args = TrafficScriptArg(['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
+ 'source_port', 'destination_port'])
src_mac = args.get_arg('tx_mac')
dst_mac = args.get_arg('rx_mac')
@@ -77,7 +76,6 @@ def main():
source_port = args.get_arg('source_port')
destination_port = args.get_arg('destination_port')
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -90,7 +88,7 @@ def main():
elif protocol.upper() == 'UDP':
protocol = UDP
else:
- raise ValueError("Invalid type of protocol!")
+ raise ValueError("Invalid protocol type!")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
@@ -100,19 +98,27 @@ def main():
protocol(sport=int(source_port), dport=int(destination_port)))
txq.send(pkt_raw)
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError("TCP/UDP Rx timeout")
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('TCP/UDP Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
- if 'TCP' in ether:
+ if TCP in ether:
print ("TCP packet received.")
- elif 'UDP' in ether:
+ elif UDP in ether:
print ("UDP packet received.")
else:
- raise RuntimeError("Not an TCP or UDP packet received {0}"
- .format(ether.__repr__()))
+ raise RuntimeError("Not an TCP or UDP packet received {0}".
+ format(ether.__repr__()))
sys.exit(0)
diff --git a/resources/traffic_scripts/send_vxlan_for_proxy_test.py b/resources/traffic_scripts/send_vxlan_for_proxy_test.py
index c356e1977e..d33ed413c8 100755
--- a/resources/traffic_scripts/send_vxlan_for_proxy_test.py
+++ b/resources/traffic_scripts/send_vxlan_for_proxy_test.py
@@ -19,24 +19,22 @@ import sys
import time
from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send VxLAN packet from TG to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
@@ -44,7 +42,7 @@ def main():
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ timeout = max(2, int(args.get_arg('timeout')))
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
@@ -53,10 +51,9 @@ def main():
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -65,17 +62,17 @@ def main():
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
vxlan = '\x08\x00\x00\x00\x00\x00\x01\x00'
raw_data = vxlan + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- UDP(sport=int(source_port), dport=4789) /
- Raw(load=raw_data))
+ ip_version(src=src_ip, dst=dst_ip) /
+ UDP(sport=int(source_port), dport=4789) /
+ Raw(load=raw_data))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
@@ -87,10 +84,17 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the proxy outbound packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
diff --git a/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py b/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py
index d998d7c6a0..3ea1f0bc62 100755
--- a/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py
+++ b/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py
@@ -16,27 +16,24 @@
from TG to DUT.
"""
import sys
-import time
from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send VxLAN-GPE+NSH packet from TG to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
@@ -44,7 +41,7 @@ def main():
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ timeout = max(2, int(args.get_arg('timeout')))
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
@@ -53,10 +50,9 @@ def main():
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -65,8 +61,8 @@ def main():
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x09\x00\x00\x06' \
'\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \
@@ -75,9 +71,9 @@ def main():
raw_data = vxlangpe_nsh + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- UDP(sport=int(source_port), dport=4790) /
- Raw(load=raw_data))
+ ip_version(src=src_ip, dst=dst_ip) /
+ UDP(sport=int(source_port), dport=4790) /
+ Raw(load=raw_data))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
@@ -89,10 +85,17 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the proxy inbound packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
diff --git a/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py b/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py
index 55c06f0691..6879d20f2c 100755
--- a/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py
+++ b/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py
@@ -16,27 +16,24 @@
from TG to DUT.
"""
import sys
-import time
from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send VxLAN-GPE+NSH packet from TG to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
@@ -44,7 +41,7 @@ def main():
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ timeout = max(2, int(args.get_arg('timeout')))
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
@@ -53,10 +50,9 @@ def main():
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -65,8 +61,8 @@ def main():
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x0a\x00\x00\x06' \
'\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \
@@ -75,9 +71,9 @@ def main():
raw_data = vxlangpe_nsh + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- UDP(sport=int(source_port), dport=4790) /
- Raw(load=raw_data))
+ ip_version(src=src_ip, dst=dst_ip) /
+ UDP(sport=int(source_port), dport=4790) /
+ Raw(load=raw_data))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
@@ -89,10 +85,17 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the sfc sff packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
diff --git a/resources/traffic_scripts/span_check.py b/resources/traffic_scripts/span_check.py
index c7ca8a7374..cbf65d3aaf 100755
--- a/resources/traffic_scripts/span_check.py
+++ b/resources/traffic_scripts/span_check.py
@@ -21,7 +21,7 @@ import sys
import ipaddress
from scapy.layers.inet import IP, ICMP, ARP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from scapy.layers.l2 import Ether
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
@@ -64,8 +64,8 @@ def main():
"""Send a simple L2 or ICMP packet from one TG interface to DUT, then
receive a copy of the packet on the second TG interface, and a copy of
the ICMP reply."""
- args = TrafficScriptArg(
- ['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', 'ptype'])
+ args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
+ 'ptype'])
src_mac = args.get_arg('tg_src_mac')
dst_mac = args.get_arg('dut_if1_mac')
@@ -104,11 +104,20 @@ def main():
txq.send(pkt_raw)
sent.append(auto_pad(pkt_raw))
- ether = rxq_mirrored.recv(2)
# Receive copy of Rx packet.
- if ether is None:
- raise RuntimeError("Rx timeout of mirrored Rx packet")
+ while True:
+ ether = rxq_mirrored.recv(2)
+ if ether is None:
+ raise RuntimeError("Rx timeout of mirrored Rx packet")
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
pkt = auto_pad(pkt_raw)
if str(ether) != str(pkt):
print("Mirrored Rx packet doesn't match the original Rx packet.")
@@ -148,15 +157,17 @@ def main():
# Receive reply on TG Tx port.
ether_repl = rxq_tx.recv(2, sent)
+
if ether_repl is None:
raise RuntimeError("Reply not received on TG Tx port.")
- else:
- print("Reply received on TG Tx port.\n")
+
+ print("Reply received on TG Tx port.\n")
# Receive copy of Tx packet.
ether = rxq_mirrored.recv(2)
if ether is None:
raise RuntimeError("Rx timeout of mirrored Tx packet")
+
if str(ether) != str(ether_repl):
print("Mirrored Tx packet doesn't match the received Tx packet.")
if ether.src != ether_repl.src or ether.dst != ether_repl.dst: