aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJan Gelety <jgelety@cisco.com>2017-09-08 11:38:38 +0200
committerTibor Frank <tifrank@cisco.com>2017-09-18 12:05:49 +0000
commit2a848f49308868dfe6fa3a9cb78bd085f8c16f40 (patch)
tree180c45ea5db2cc095c65d3b698a3e05a6ee819fe
parent6928a6be42016a6c5edade6369041670fe544f39 (diff)
Ignore unexpected ICMPv6 Neighbor Discovery - Neighbor Solicitation packets
We need to adapt all functional traffic scripts related to functional IPv6 tests to ingore receiving of unexpected ICMPv6ND_NS (ICMPv6 Neighbor Discovery - Neighbor Solicitation) packets that are sent automatically and we cannot avoid to receive them. The reason is to prevent false negative test results in case of csit functional tests that could block creation of new operational branch (csit weekly jobs), usage of new vpp builds (csit semiweekly jobs) and merging patches - csit as well as vpp. Change-Id: I43c90e7c766762fa769a81661338759a11b401a1 Signed-off-by: Jan Gelety <jgelety@cisco.com>
-rwxr-xr-xresources/traffic_scripts/check_ra_packet.py46
-rwxr-xr-xresources/traffic_scripts/icmpv6_echo.py64
-rwxr-xr-xresources/traffic_scripts/icmpv6_echo_req_resp.py119
-rwxr-xr-xresources/traffic_scripts/ipfix_check.py15
-rwxr-xr-xresources/traffic_scripts/ipfix_sessions.py7
-rwxr-xr-xresources/traffic_scripts/ipsec.py51
-rwxr-xr-xresources/traffic_scripts/ipv6_nd_proxy_check.py89
-rwxr-xr-xresources/traffic_scripts/ipv6_ns.py67
-rwxr-xr-xresources/traffic_scripts/ipv6_sweep_ping.py57
-rwxr-xr-xresources/traffic_scripts/policer.py58
-rwxr-xr-xresources/traffic_scripts/send_icmp_check_headers.py45
-rwxr-xr-xresources/traffic_scripts/send_icmp_check_multipath.py33
-rwxr-xr-xresources/traffic_scripts/send_icmp_type_code.py35
-rwxr-xr-xresources/traffic_scripts/send_icmp_wait_for_reply.py34
-rwxr-xr-xresources/traffic_scripts/send_ip_icmp.py57
-rwxr-xr-xresources/traffic_scripts/send_rs_check_ra.py52
-rwxr-xr-xresources/traffic_scripts/send_tcp_for_classifier_test.py37
-rwxr-xr-xresources/traffic_scripts/send_tcp_udp.py34
-rwxr-xr-xresources/traffic_scripts/send_vxlan_for_proxy_test.py42
-rwxr-xr-xresources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py43
-rwxr-xr-xresources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py43
-rwxr-xr-xresources/traffic_scripts/span_check.py27
22 files changed, 631 insertions, 424 deletions
diff --git a/resources/traffic_scripts/check_ra_packet.py b/resources/traffic_scripts/check_ra_packet.py
index 231e07da11..9717c7db95 100755
--- a/resources/traffic_scripts/check_ra_packet.py
+++ b/resources/traffic_scripts/check_ra_packet.py
@@ -17,6 +17,8 @@
import sys
import ipaddress
+from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_NS
+
from resources.libraries.python.PacketVerifier import RxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -56,15 +58,23 @@ def main():
interval = int(args.get_arg('interval'))
rxq = RxQueue(rx_if)
- ether = rxq.recv(max(5, interval))
+ # receive ICMPv6ND_RA packet
+ while True:
+ ether = rxq.recv(max(5, interval))
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layer RA and check other values
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
- if not ether.haslayer('ICMPv6ND_RA'):
- raise RuntimeError('Not an RA packet received {0}'
- .format(ether.__repr__()))
+ # Check if received packet contains layer RA and check other values
+ if not ether.haslayer(ICMPv6ND_RA):
+ raise RuntimeError('Not an RA packet received {0}'.
+ format(ether.__repr__()))
src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
@@ -72,22 +82,22 @@ def main():
all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1')
if src_address != link_local:
- raise RuntimeError(
- 'Source address ({0}) not matching link local address({1})'.format(
- src_address, link_local))
+ raise RuntimeError('Source address ({0}) not matching link local '
+ 'address ({1})'.format(src_address, link_local))
if dst_address != all_nodes_multicast:
- raise RuntimeError('Packet destination address ({0}) is not the all '
- 'nodes multicast address ({1}).'.format(
- dst_address, all_nodes_multicast))
- if ether['IPv6'].hlim != 255:
- raise RuntimeError('Hop limit not correct: {0}!=255'.format(
- ether['IPv6'].hlim))
-
- ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code
+ raise RuntimeError('Packet destination address ({0}) is not the all'
+ ' nodes multicast address ({1}).'.
+ format(dst_address, all_nodes_multicast))
+ if ether[IPv6].hlim != 255:
+ raise RuntimeError('Hop limit not correct: {0}!=255'.
+ format(ether[IPv6].hlim))
+
+ ra_code = ether[ICMPv6ND_RA].code
if ra_code != 0:
raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/icmpv6_echo.py b/resources/traffic_scripts/icmpv6_echo.py
index 4bf573a1f1..a18896b07f 100755
--- a/resources/traffic_scripts/icmpv6_echo.py
+++ b/resources/traffic_scripts/icmpv6_echo.py
@@ -17,13 +17,19 @@
import sys
import logging
+
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
from scapy.all import Ether
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def main():
@@ -43,52 +49,60 @@ def main():
# send ICMPv6 neighbor advertisement message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=src_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=src_mac))
+ IPv6(src=src_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NA(tgt=src_ip, R=0) /
+ ICMPv6NDOptDstLLAddr(lladdr=src_mac))
sent_packets.append(pkt_send)
txq.send(pkt_send)
# send ICMPv6 echo request
pkt_send = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
+ IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
sent_packets.append(pkt_send)
txq.send(pkt_send)
# receive ICMPv6 echo reply
- ether = rxq.recv(2, sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ while True:
+ ether = rxq.recv(2, sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
+ format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no ICMPv6 echo reply '
+ 'received {0}'.format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Reply']
+ icmpv6 = ipv6[ICMPv6EchoReply]
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
- 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
+ raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} seq {1} '
+ 'should be ID {2} seq {3}'.
+ format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/icmpv6_echo_req_resp.py b/resources/traffic_scripts/icmpv6_echo_req_resp.py
index c2cc4d20a0..ec9cf94a67 100755
--- a/resources/traffic_scripts/icmpv6_echo_req_resp.py
+++ b/resources/traffic_scripts/icmpv6_echo_req_resp.py
@@ -18,14 +18,20 @@
import sys
import logging
+
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
+
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.all import Ether
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
+
def main():
args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
@@ -52,109 +58,122 @@ def main():
# send ICMPv6 neighbor advertisement message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=src_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=src_mac))
+ IPv6(src=src_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NA(tgt=src_ip, R=0) /
+ ICMPv6NDOptDstLLAddr(lladdr=src_mac))
src_sent_packets.append(pkt_send)
src_txq.send(pkt_send)
pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=dst_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
+ IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NA(tgt=dst_ip, R=0) /
+ ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
dst_sent_packets.append(pkt_send)
dst_txq.send(pkt_send)
# send ICMPv6 echo request from first TG interface
pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
- ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
+ IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
+ ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
src_sent_packets.append(pkt_send)
src_txq.send(pkt_send)
# receive ICMPv6 echo request on second TG interface
- ether = dst_rxq.recv(2, dst_sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ while True:
+ ether = dst_rxq.recv(2, dst_sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 received: {0}'.
+ format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
# verify hop limit processing
if ipv6.hlim != (hop_limit - hop_num):
- raise RuntimeError(
- 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
- hop_limit - hop_num))
+ raise RuntimeError('Invalid hop limit {0} should be {1}'.
+ format(ipv6.hlim,hop_limit - hop_num))
if not ipv6.haslayer(ICMPv6EchoRequest):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
+ format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Request']
+ icmpv6 = ipv6[ICMPv6EchoRequest]
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
- 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
+ raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
+ 'seq {1} should be ID {2} seq {3}'.
+ format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoRequest(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
# send ICMPv6 echo reply from second TG interface
pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
- IPv6(src=dst_ip, dst=src_ip) /
- ICMPv6EchoReply(id=echo_id, seq=echo_seq))
+ IPv6(src=dst_ip, dst=src_ip) /
+ ICMPv6EchoReply(id=echo_id, seq=echo_seq))
dst_sent_packets.append(pkt_send)
dst_txq.send(pkt_send)
# receive ICMPv6 echo reply on first TG interface
- ether = src_rxq.recv(2, src_sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ while True:
+ ether = src_rxq.recv(2, src_sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'.
+ format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
# verify hop limit processing
if ipv6.hlim != (hop_limit - hop_num):
- raise RuntimeError(
- 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
- hop_limit - hop_num))
+ raise RuntimeError('Invalid hop limit {0} should be {1}'.
+ format(ipv6.hlim, hop_limit - hop_num))
if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
+ format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Reply']
+ icmpv6 = ipv6[ICMPv6EchoReply]
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
- 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
+ raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
+ 'seq {1} should be ID {2} seq {3}'.
+ format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipfix_check.py b/resources/traffic_scripts/ipfix_check.py
index 2a08f0ce85..f5693cc7e8 100755
--- a/resources/traffic_scripts/ipfix_check.py
+++ b/resources/traffic_scripts/ipfix_check.py
@@ -24,8 +24,8 @@ from scapy.layers.l2 import Ether
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler, \
- IPFIXData
+from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler
+from resources.libraries.python.telemetry.IPFIXUtil import IPFIXData
def valid_ipv4(ip):
@@ -123,6 +123,11 @@ def main():
pkt = rxq.recv(10, ignore=ignore, verbose=verbose)
if pkt is None:
raise RuntimeError("RX timeout")
+
+ if pkt.haslayer("ICMPv6ND_NS"):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+
if pkt.haslayer("IPFIXHeader"):
if pkt.haslayer("IPFIXTemplate"):
# create or update template for IPFIX data packets
@@ -138,9 +143,9 @@ def main():
# verify packet count
if data["packetTotalCount"] != count:
- raise RuntimeError(
- "IPFIX reported wrong packet count. Count was {0},"
- " but should be {1}".format(data["packetTotalCount"], count))
+ raise RuntimeError("IPFIX reported wrong packet count. Count was {0},"
+ "but should be {1}".format(data["packetTotalCount"],
+ count))
# verify IP addresses
keys = data.keys()
err = "{0} mismatch. Packets used {1}, but were classified as {2}."
diff --git a/resources/traffic_scripts/ipfix_sessions.py b/resources/traffic_scripts/ipfix_sessions.py
index e7597a894a..11e77fa08c 100755
--- a/resources/traffic_scripts/ipfix_sessions.py
+++ b/resources/traffic_scripts/ipfix_sessions.py
@@ -192,6 +192,11 @@ def main():
pkt = rxq.recv(5)
if pkt is None:
raise RuntimeError("RX timeout")
+
+ if pkt.haslayer("ICMPv6ND_NS"):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+
if pkt.haslayer("IPFIXHeader"):
if pkt.haslayer("IPFIXTemplate"):
# create or update template for IPFIX data packets
@@ -219,6 +224,6 @@ def main():
raise RuntimeError("Received non-IPFIX packet or IPFIX header was"
"not recognized.")
-if __name__ == "__main__":
+if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipsec.py b/resources/traffic_scripts/ipsec.py
index 13d44b8a51..1561738c60 100755
--- a/resources/traffic_scripts/ipsec.py
+++ b/resources/traffic_scripts/ipsec.py
@@ -18,8 +18,14 @@
import sys
import logging
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from scapy.all import Ether, IP, ICMP, IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+
+from scapy.all import Ether
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.layers.ipsec import SecurityAssociation, ESP
from ipaddress import ip_address
@@ -39,7 +45,7 @@ def check_ipv4(pkt_recv, dst_tun, src_ip, dst_ip, sa_in):
:type dst_tun: str
:type src_ip: str
:type dst_ip: str
- :type sa_sa: scapy.layers.ipsec.SecurityAssociation
+ :type sa_in: scapy.layers.ipsec.SecurityAssociation
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
@@ -55,15 +61,15 @@ def check_ipv4(pkt_recv, dst_tun, src_ip, dst_ip, sa_in):
raise RuntimeError(
'Not an ESP packet received: {0}'.format(pkt_recv.__repr__()))
- ip_pkt = pkt_recv['IP']
+ ip_pkt = pkt_recv[IP]
d_pkt = sa_in.decrypt(ip_pkt)
- if d_pkt['IP'].dst != dst_ip:
+ if d_pkt[IP].dst != dst_ip:
raise RuntimeError(
'Decrypted packet has invalid destination address: {0} '
'should be: {1}'.format(d_pkt['IP'].dst, dst_ip))
- if d_pkt['IP'].src != src_ip:
+ if d_pkt[IP].src != src_ip:
raise RuntimeError(
'Decrypted packet has invalid source address: {0} should be: {1}'
.format(d_pkt['IP'].src, src_ip))
@@ -93,7 +99,7 @@ def check_ipv6(pkt_recv, dst_tun, src_ip, dst_ip, sa_in):
raise RuntimeError(
'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__()))
- if pkt_recv['IPv6'].dst != dst_tun:
+ if pkt_recv[IPv6].dst != dst_tun:
raise RuntimeError(
'Received packet has invalid destination address: {0} '
'should be: {1}'.format(pkt_recv['IPv6'].dst, dst_tun))
@@ -102,15 +108,15 @@ def check_ipv6(pkt_recv, dst_tun, src_ip, dst_ip, sa_in):
raise RuntimeError(
'Not an ESP packet received: {0}'.format(pkt_recv.__repr__()))
- ip_pkt = pkt_recv['IPv6']
+ ip_pkt = pkt_recv[IPv6]
d_pkt = sa_in.decrypt(ip_pkt)
- if d_pkt['IPv6'].dst != dst_ip:
+ if d_pkt[IPv6].dst != dst_ip:
raise RuntimeError(
'Decrypted packet has invalid destination address {0}: '
'should be: {1}'.format(d_pkt['IPv6'].dst, dst_ip))
- if d_pkt['IPv6'].src != src_ip:
+ if d_pkt[IPv6].src != src_ip:
raise RuntimeError(
'Decrypted packet has invalid source address: {0} should be: {1}'
.format(d_pkt['IPv6'].src, src_ip))
@@ -175,25 +181,33 @@ def main():
sent_packets = []
if is_ipv4:
- ip_pkt = IP(src=src_ip, dst=dst_ip) / \
- ICMP()
+ ip_pkt = (IP(src=src_ip, dst=dst_ip) /
+ ICMP())
ip_pkt = IP(str(ip_pkt))
else:
- ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \
- ICMPv6EchoRequest()
+ ip_pkt = (IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest())
ip_pkt = IPv6(str(ip_pkt))
e_pkt = sa_out.encrypt(ip_pkt)
- pkt_send = Ether(src=src_mac, dst=dst_mac) / \
- e_pkt
+ pkt_send = (Ether(src=src_mac, dst=dst_mac) /
+ e_pkt)
sent_packets.append(pkt_send)
txq.send(pkt_send)
- pkt_recv = rxq.recv(2, sent_packets)
+ while True:
+ pkt_recv = rxq.recv(2, sent_packets)
- if pkt_recv is None:
- raise RuntimeError('ESP packet Rx timeout')
+ if pkt_recv is None:
+ raise RuntimeError('ESP packet Rx timeout')
+
+ if pkt_recv.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if is_ipv4:
check_ipv4(pkt_recv, src_tun, dst_ip, src_ip, sa_in)
@@ -202,5 +216,6 @@ def main():
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py
index b1028028b0..c9213999ec 100755
--- a/resources/traffic_scripts/ipv6_nd_proxy_check.py
+++ b/resources/traffic_scripts/ipv6_nd_proxy_check.py
@@ -15,8 +15,9 @@
"""Traffic script that sends DHCPv6 proxy packets."""
from scapy.layers.inet import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
- ICMPv6ND_NS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -44,17 +45,23 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
sent_packets = []
- icmpv6nd_solicit_pkt =\
- Ether(src=src_mac, dst=dst_mac) / \
- IPv6(src=src_ip) / \
- ICMPv6ND_NS(tgt=dst_ip)
+ icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) /
+ IPv6(src=src_ip) /
+ ICMPv6ND_NS(tgt=dst_ip))
sent_packets.append(icmpv6nd_solicit_pkt)
txq.send(icmpv6nd_solicit_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3, ignore=sent_packets)
+ while True:
+ pkt = rxq.recv(3, ignore=sent_packets)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
@@ -63,34 +70,33 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
raise RuntimeError('ICMPv6ND Proxy response timeout.')
if ether.src != dst_mac:
- raise RuntimeError("Source MAC address error: {} != {}".format(
- ether.src, dst_mac))
+ raise RuntimeError("Source MAC address error: {} != {}".
+ format(ether.src, dst_mac))
print "Source MAC address: OK."
if ether.dst != src_mac:
- raise RuntimeError("Destination MAC address error: {} != {}".format(
- ether.dst, src_mac))
+ raise RuntimeError("Destination MAC address error: {} != {}".
+ format(ether.dst, src_mac))
print "Destination MAC address: OK."
- if ether['IPv6'].src != dst_ip:
- raise RuntimeError("Source IP address error: {} != {}".format(
- ether['IPv6'].src, dst_ip))
+ if ether[IPv6].src != dst_ip:
+ raise RuntimeError("Source IP address error: {} != {}".
+ format(ether[IPv6].src, dst_ip))
print "Source IP address: OK."
- if ether['IPv6'].dst != src_ip:
- raise RuntimeError("Destination IP address error: {} != {}".format(
- ether['IPv6'].dst, src_ip))
+ if ether[IPv6].dst != src_ip:
+ raise RuntimeError("Destination IP address error: {} != {}".
+ format(ether[IPv6].dst, src_ip))
print "Destination IP address: OK."
try:
- target_addr = ether['IPv6']\
- ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt
+ target_addr = ether[IPv6][ICMPv6ND_NA].tgt
except (KeyError, AttributeError):
raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
if target_addr != dst_ip:
- raise RuntimeError("ICMPv6 field 'Target address' error:"
- " {} != {}".format(target_addr, dst_ip))
+ raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}".
+ format(target_addr, dst_ip))
print "Target address field: OK."
@@ -119,16 +125,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
rxq = RxQueue(dst_if)
txq = TxQueue(src_if)
- icmpv6_ping_pkt = \
- Ether(src=src_mac, dst=proxy_to_src_mac) / \
- IPv6(src=src_ip, dst=dst_ip) / \
- ICMPv6EchoRequest()
+ icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) /
+ IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest())
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3)
+ while True:
+ pkt = rxq.recv(3)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
@@ -136,7 +148,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
if ether is None:
raise RuntimeError('ICMPv6 Echo Request timeout.')
try:
- ether["IPv6"]["ICMPv6 Echo Request"]
+ ether[IPv6]["ICMPv6 Echo Request"]
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
print "ICMP Echo: OK."
@@ -144,16 +156,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
rxq = RxQueue(src_if)
txq = TxQueue(dst_if)
- icmpv6_ping_pkt = \
- Ether(src=dst_mac, dst=proxy_to_dst_mac) / \
- IPv6(src=dst_ip, dst=src_ip) / \
- ICMPv6EchoReply()
+ icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) /
+ IPv6(src=dst_ip, dst=src_ip) /
+ ICMPv6EchoReply())
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3)
+ while True:
+ pkt = rxq.recv(3)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
@@ -161,7 +179,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
if ether is None:
raise RuntimeError('DHCPv6 SOLICIT timeout')
try:
- ether["IPv6"]["ICMPv6 Echo Reply"]
+ ether[IPv6]["ICMPv6 Echo Reply"]
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
@@ -187,8 +205,9 @@ def main():
imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
# Verify route (ICMP echo/reply)
- ipv6_ping(src_if, dst_if, src_mac, dst_mac,
- proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip)
+ ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac,
+ proxy_to_dst_mac, src_ip, dst_ip)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipv6_ns.py b/resources/traffic_scripts/ipv6_ns.py
index 70c6ab445a..5852e6317f 100755
--- a/resources/traffic_scripts/ipv6_ns.py
+++ b/resources/traffic_scripts/ipv6_ns.py
@@ -17,13 +17,18 @@
import sys
import logging
+
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
+
+from scapy.all import Ether
from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr
-from scapy.all import Ether
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def main():
@@ -41,57 +46,67 @@ def main():
# send ICMPv6 neighbor solicitation message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NS(tgt=dst_ip) /
- ICMPv6NDOptSrcLLAddr(lladdr=src_mac))
+ IPv6(src=src_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NS(tgt=dst_ip) /
+ ICMPv6NDOptSrcLLAddr(lladdr=src_mac))
sent_packets.append(pkt_send)
txq.send(pkt_send)
# receive ICMPv6 neighbor advertisement message
- ether = rxq.recv(2, sent_packets)
+ while True:
+ ether = rxq.recv(2, sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
if ether is None:
raise RuntimeError('ICMPv6 echo reply Rx timeout')
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
+ format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
if not ipv6.haslayer(ICMPv6ND_NA):
- raise RuntimeError(
- 'Unexpected packet with no ICMPv6 ND-NA received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received '
+ '{0}'.format(ipv6.__repr__()))
- icmpv6_na = ipv6['ICMPv6 Neighbor Discovery - Neighbor Advertisement']
+ icmpv6_na = ipv6[ICMPv6ND_NA]
# verify target address
if icmpv6_na.tgt != dst_ip:
- raise RuntimeError('Invalid target address {0} should be {1}'.format(
- icmpv6_na.tgt, dst_ip))
+ raise RuntimeError('Invalid target address {0} should be {1}'.
+ format(icmpv6_na.tgt, dst_ip))
if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr):
- raise RuntimeError(
- 'Missing Destination Link-Layer Address option in ICMPv6 ' +
- 'Neighbor Advertisement {0}'.format(icmpv6_na.__repr__()))
+ raise RuntimeError('Missing Destination Link-Layer Address option in '
+ 'ICMPv6 Neighbor Advertisement {0}'.
+ format(icmpv6_na.__repr__()))
- option = 'ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address'
- dst_ll_addr = icmpv6_na[option]
+ dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr]
# verify destination link-layer address field
if dst_ll_addr.lladdr != dst_mac:
- raise RuntimeError('Invalid lladdr {0} should be {1}'.format(
- dst_ll_addr.lladdr, dst_mac))
+ raise RuntimeError('Invalid lladdr {0} should be {1}'.
+ format(dst_ll_addr.lladdr, dst_mac))
# verify checksum
cksum = icmpv6_na.cksum
del icmpv6_na.cksum
tmp = ICMPv6ND_NA(str(icmpv6_na))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/ipv6_sweep_ping.py b/resources/traffic_scripts/ipv6_sweep_ping.py
index 80fdda532a..28d23a16ef 100755
--- a/resources/traffic_scripts/ipv6_sweep_ping.py
+++ b/resources/traffic_scripts/ipv6_sweep_ping.py
@@ -19,13 +19,18 @@ import logging
import os
import sys
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, \
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
from scapy.all import Ether
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def main():
@@ -68,37 +73,43 @@ def main():
sent_packets.append(pkt_send)
txq.send(pkt_send)
- ether = rxq.recv(ignore=sent_packets)
- if ether is None:
- raise RuntimeError(
- 'ICMPv6 echo reply seq {0} Rx timeout'.format(echo_seq))
+ while True:
+ ether = rxq.recv(ignore=sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply seq {0} '
+ 'Rx timeout'.format(echo_seq))
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 layer '
+ 'received: {0}'.format(ether.__repr__()))
- ipv6 = ether['IPv6']
+ ipv6 = ether[IPv6]
if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no ICMPv6 echo reply '
+ 'layer received: {0}'.format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Reply']
+ icmpv6 = ipv6[ICMPv6EchoReply]
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be '
- 'ID {2} seq {3}, {0}'.format(icmpv6.id, icmpv6.seq, echo_id,
- echo_seq))
+ raise RuntimeError('ICMPv6 echo reply with invalid data received: '
+ 'ID {0} seq {1} should be ID {2} seq {3}, {0}'.
+ format(icmpv6.id, icmpv6.seq, echo_id,
+ echo_seq))
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum: {0} should be {1}'.
+ format(cksum, tmp.cksum))
sent_packets.remove(pkt_send)
diff --git a/resources/traffic_scripts/policer.py b/resources/traffic_scripts/policer.py
index ee9fa29c4a..62c397d496 100755
--- a/resources/traffic_scripts/policer.py
+++ b/resources/traffic_scripts/policer.py
@@ -21,7 +21,10 @@ import logging
# pylint: disable=no-name-in-module
# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from scapy.all import Ether, IP, IPv6, TCP
+
+from scapy.all import Ether
+from scapy.layers.inet import IP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from ipaddress import ip_address
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -38,17 +41,17 @@ def check_ipv4(pkt_recv, dscp):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
- raise RuntimeError(
- 'Not an IPv4 packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not an IPv4 packet received: {0}'.
+ format(pkt_recv.__repr__()))
- rx_dscp = pkt_recv['IP'].tos >> 2
+ rx_dscp = pkt_recv[IP].tos >> 2
if rx_dscp != dscp:
- raise RuntimeError(
- 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp))
+ raise RuntimeError('Invalid DSCP {0} should be {1}'.
+ format(rx_dscp, dscp))
if not pkt_recv.haslayer(TCP):
- raise RuntimeError(
- 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not a TCP packet received: {0}'.
+ format(pkt_recv.__repr__()))
def check_ipv6(pkt_recv, dscp):
@@ -61,17 +64,17 @@ def check_ipv6(pkt_recv, dscp):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IPv6):
- raise RuntimeError(
- 'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not an IPv6 packet received: {0}'.
+ format(pkt_recv.__repr__()))
- rx_dscp = pkt_recv['IPv6'].tc >> 2
+ rx_dscp = pkt_recv[IPv6].tc >> 2
if rx_dscp != dscp:
- raise RuntimeError(
- 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp))
+ raise RuntimeError('Invalid DSCP {0} should be {1}'.
+ format(rx_dscp, dscp))
if not pkt_recv.haslayer(TCP):
- raise RuntimeError(
- 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not a TCP packet received: {0}'.
+ format(pkt_recv.__repr__()))
# pylint: disable=too-many-locals
@@ -97,19 +100,29 @@ def main():
sent_packets = []
if is_ipv4:
- ip_pkt = IP(src=src_ip, dst=dst_ip) / \
- TCP()
+ ip_pkt = (IP(src=src_ip, dst=dst_ip) /
+ TCP())
else:
- ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \
- TCP()
+ ip_pkt = (IPv6(src=src_ip, dst=dst_ip) /
+ TCP())
- pkt_send = Ether(src=src_mac, dst=dst_mac) / \
- ip_pkt
+ pkt_send = (Ether(src=src_mac, dst=dst_mac) /
+ ip_pkt)
sent_packets.append(pkt_send)
txq.send(pkt_send)
- pkt_recv = rxq.recv(2, sent_packets)
+ while True:
+ pkt_recv = rxq.recv(2, sent_packets)
+ if pkt_recv is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if pkt_recv.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt_recv is None:
raise RuntimeError('Rx timeout')
@@ -121,5 +134,6 @@ def main():
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_icmp_check_headers.py b/resources/traffic_scripts/send_icmp_check_headers.py
index c9e3a35df4..2193164b61 100755
--- a/resources/traffic_scripts/send_icmp_check_headers.py
+++ b/resources/traffic_scripts/send_icmp_check_headers.py
@@ -22,8 +22,7 @@ import sys
import ipaddress
from robot.api import logger
from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from scapy.layers.l2 import Ether, Dot1Q
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
@@ -94,28 +93,36 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- if tx_if == rx_if:
- ether = rxq.recv(2, ignore=sent_packets)
- else:
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
+ while True:
+ if tx_if == rx_if:
+ ether = rxq.recv(2, ignore=sent_packets)
+ else:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
logger.trace("MAC matched")
else:
- raise RuntimeError(
- "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+ raise RuntimeError("Matching packet unsuccessful: {0}".
+ format(ether.__repr__()))
if encaps_rx == 'Dot1q':
if ether[Dot1Q].vlan == int(vlan_rx):
logger.trace("VLAN matched")
else:
raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
- 'received, {}-expected):\n{}'
- .format(ether[Dot1Q].vlan, vlan_rx,
- ether.__repr__()))
+ 'received, {}-expected):\n{}'.
+ format(ether[Dot1Q].vlan, vlan_rx,
+ ether.__repr__()))
ip = ether[Dot1Q].payload
elif encaps_rx == 'Dot1ad':
raise NotImplementedError()
@@ -123,21 +130,21 @@ def main():
ip = ether.payload
if not isinstance(ip, ip_format):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError("Not an IP packet received {0}".
+ format(ip.__repr__()))
# Compare data from packets
if src_ip == ip.src:
logger.trace("Src IP matched")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
- .format(src_ip, ip.src))
+ raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
+ format(src_ip, ip.src))
if dst_ip == ip.dst:
logger.trace("Dst IP matched")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
- .format(dst_ip, ip.dst))
+ raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
+ format(dst_ip, ip.dst))
sys.exit(0)
diff --git a/resources/traffic_scripts/send_icmp_check_multipath.py b/resources/traffic_scripts/send_icmp_check_multipath.py
index 4e39efdfcd..5da3b7b096 100755
--- a/resources/traffic_scripts/send_icmp_check_multipath.py
+++ b/resources/traffic_scripts/send_icmp_check_multipath.py
@@ -18,10 +18,9 @@ and check if it is divided into two paths."""
import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether
-from scapy.layers.inet6 import ICMPv6EchoRequest
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -94,27 +93,38 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
+
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if ether is None:
raise RuntimeError("ICMP echo Rx timeout")
if not ether.haslayer(ip_format):
- raise RuntimeError("Not an IP packet received {0}"
- .format(ether.__repr__()))
+ raise RuntimeError("Not an IP packet received {0}".
+ format(ether.__repr__()))
- if ether['Ethernet'].src != dut_if2_mac:
+ if ether[Ether].src != dut_if2_mac:
raise RuntimeError("Source MAC address error")
- if ether['Ethernet'].dst == path_1_mac:
+ if ether[Ether].dst == path_1_mac:
path_1_counter += 1
- elif ether['Ethernet'].dst == path_2_mac:
+ elif ether[Ether].dst == path_2_mac:
path_2_counter += 1
else:
raise RuntimeError("Destination MAC address error")
if (path_1_counter + path_2_counter) != 100:
- raise RuntimeError("Packet loss: recevied only {} packets of 100 "
- .format(path_1_counter + path_2_counter))
+ raise RuntimeError("Packet loss: recevied only {} packets of 100 ".
+ format(path_1_counter + path_2_counter))
if path_1_counter == 0:
raise RuntimeError("Path 1 error!")
@@ -127,5 +137,6 @@ def main():
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_icmp_type_code.py b/resources/traffic_scripts/send_icmp_type_code.py
index dffaafff38..2997f91264 100755
--- a/resources/traffic_scripts/send_icmp_type_code.py
+++ b/resources/traffic_scripts/send_icmp_type_code.py
@@ -19,10 +19,9 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -88,13 +87,13 @@ def main():
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IP(src=src_ip, dst=dst_ip) /
ICMP(code=icmp_code, type=icmp_type))
- ip_format = 'IP'
- icmp_format = 'ICMP'
+ ip_format = IP
+ icmp_format = ICMP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IPv6(src=src_ip, dst=dst_ip) /
ICMPv6EchoRequest(code=icmp_code, type=icmp_type))
- ip_format = 'IPv6'
+ ip_format = IPv6
icmp_format = 'ICMPv6'
else:
raise ValueError("IP(s) not in correct format")
@@ -103,23 +102,31 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layers Ether, IP and ICMP
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+ # Check whether received packet contains layers IP and ICMP
if not ether.haslayer(ip_format):
- raise RuntimeError('Not an IP packet received {0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an IP packet received {0}'.
+ format(ether.__repr__()))
# Cannot use haslayer for ICMPv6, every type of ICMPv6 is a separate layer
# Next header value of 58 means the next header is ICMPv6
if not ether.haslayer(icmp_format) and ether[ip_format].nh != 58:
- raise RuntimeError('Not an ICMP packet received {0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an ICMP packet received {0}'.
+ format(ether.__repr__()))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_icmp_wait_for_reply.py b/resources/traffic_scripts/send_icmp_wait_for_reply.py
index a77f15efc2..7677152801 100755
--- a/resources/traffic_scripts/send_icmp_wait_for_reply.py
+++ b/resources/traffic_scripts/send_icmp_wait_for_reply.py
@@ -17,9 +17,9 @@
import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.all import Ether
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -35,14 +35,14 @@ def is_icmp_reply(pkt, ipformat):
:type ipformat: dict
:rtype: bool
"""
- #pylint: disable=bare-except
+ # pylint: disable=bare-except
try:
if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \
ipformat['Type']:
return True
else:
return False
- except:
+ except: # pylint: disable=bare-except
return False
@@ -58,12 +58,12 @@ def address_check(request, reply, ipformat):
:type ipformat: dict
:rtype: bool
"""
- #pylint: disable=bare-except
+ # pylint: disable=bare-except
try:
r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst
r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src
return r_src and r_dst
- except:
+ except: # pylint: disable=bare-except
return False
@@ -139,12 +139,21 @@ def main():
txq.send(icmp_request)
for _ in range(1000):
- icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
- if icmp_reply is None:
- timeout -= wait_step
- if timeout < 0:
- raise RuntimeError("ICMP echo Rx timeout")
- elif is_icmp_reply(icmp_reply, ip_format):
+ while True:
+ icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
+ if icmp_reply is None:
+ timeout -= wait_step
+ if timeout < 0:
+ raise RuntimeError("ICMP echo Rx timeout")
+
+ elif icmp_reply.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
+ if is_icmp_reply(icmp_reply, ip_format):
if address_check(icmp_request, icmp_reply, ip_format):
break
else:
@@ -154,5 +163,6 @@ def main():
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_ip_icmp.py b/resources/traffic_scripts/send_ip_icmp.py
index b22b5d39a8..58f2e1e4d8 100755
--- a/resources/traffic_scripts/send_ip_icmp.py
+++ b/resources/traffic_scripts/send_ip_icmp.py
@@ -19,11 +19,9 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
import sys
import ipaddress
+from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import ICMP, IP
-from scapy.layers.l2 import Ether
-from scapy.layers.l2 import Dot1Q
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -107,8 +105,8 @@ def main():
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IP(src=src_ip, dst=dst_ip) /
ICMP())
- ip_format = 'IP'
- icmp_format = 'ICMP'
+ ip_format = IP
+ icmp_format = ICMP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if encaps == 'Dot1q':
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
@@ -125,8 +123,8 @@ def main():
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IPv6(src=src_ip, dst=dst_ip) /
ICMPv6EchoRequest())
- ip_format = 'IPv6'
- icmp_format = 'ICMPv6EchoRequest'
+ ip_format = IPv6
+ icmp_format = ICMPv6EchoRequest
else:
raise ValueError("IP(s) not in correct format")
@@ -134,44 +132,55 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
+ # Receive ICMP / ICMPv6 echo reply
+ while True:
+ ether = rxq.recv(2,)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layers Ether, IP and ICMP
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+ # Check whether received packet contains layers IP/IPv6 and
+ # ICMP/ICMPv6EchoRequest
if encaps_rx:
if encaps_rx == 'Dot1q':
if not vlan1_rx:
vlan1_rx = vlan1
if not ether.haslayer(Dot1Q):
- raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'.
+ format(ether.__repr__()))
elif ether[Dot1Q].vlan != int(vlan1_rx):
raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) '
- 'received ({} expected):\n{}'.format(
- ether[Dot1Q].vlan, vlan1_rx, ether.__repr__()))
+ 'received ({} expected):\n{}'.
+ format(ether[Dot1Q].vlan, vlan1_rx,
+ ether.__repr__()))
elif encaps_rx == 'Dot1ad':
if not vlan1_rx:
vlan1_rx = vlan1
if not vlan2_rx:
vlan2_rx = vlan2
# TODO
- raise RuntimeError('Encapsulation {0} not implemented yet.'
- .format(encaps_rx))
+ raise RuntimeError('Encapsulation {0} not implemented yet.'.
+ format(encaps_rx))
else:
- raise RuntimeError('Unsupported/unknown encapsulation expected: {0}'
- .format(encaps_rx))
+ raise RuntimeError('Unsupported encapsulation expected: {0}'.
+ format(encaps_rx))
if not ether.haslayer(ip_format):
- raise RuntimeError('Not an IP packet received:\n{0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an IP/IPv6 packet received:\n{0}'.
+ format(ether.__repr__()))
if not ether.haslayer(icmp_format):
- raise RuntimeError('Not an ICMP packet received:\n{0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an ICMP/ICMPv6EchoRequest packet received:\n'
+ '{0}'.format(ether.__repr__()))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_rs_check_ra.py b/resources/traffic_scripts/send_rs_check_ra.py
index 9ba1f55b52..c9ff46528a 100755
--- a/resources/traffic_scripts/send_rs_check_ra.py
+++ b/resources/traffic_scripts/send_rs_check_ra.py
@@ -18,7 +18,7 @@ import sys
import ipaddress
from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6ND_RS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_RS, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -72,24 +72,29 @@ def main():
sent_packets = [pkt_raw]
txq.send(pkt_raw)
- ether = rxq.recv(8, ignore=sent_packets)
+ while True:
+ ether = rxq.recv(2, sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layer RA and check other values
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+ # Check whether received packet contains layer RA and check other values
if ether.src != router_mac:
- raise RuntimeError(
- 'Packet source MAC ({0}) does not match '
- 'router MAC ({1}).'.format(ether.src, router_mac))
+ raise RuntimeError('Packet source MAC ({0}) does not match router MAC '
+ '({1}).'.format(ether.src, router_mac))
if ether.dst != src_mac:
- raise RuntimeError(
- 'Packet destination MAC ({0}) does not match '
- 'RS source MAC ({1}).'.format(ether.dst, src_mac))
+ raise RuntimeError('Packet destination MAC ({0}) does not match RS '
+ 'source MAC ({1}).'.format(ether.dst, src_mac))
- if not ether.haslayer('ICMPv6ND_RA'):
- raise RuntimeError('Not an RA packet received {0}'
- .format(ether.__repr__()))
+ if not ether.haslayer(ICMPv6ND_RA):
+ raise RuntimeError('Not an RA packet received {0}'.
+ format(ether.__repr__()))
src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
@@ -98,24 +103,25 @@ def main():
rs_src_address = ipaddress.IPv6Address(unicode(src_ip))
if src_address != router_link_local:
- raise RuntimeError(
- 'Packet source address ({0}) does not match '
- 'link local address({1})'.format(src_address, router_link_local))
+ raise RuntimeError('Packet source address ({0}) does not match link '
+ 'local address({1})'.
+ format(src_address, router_link_local))
if dst_address != rs_src_address:
- raise RuntimeError(
- 'Packet destination address ({0}) does not match '
- 'RS source address ({1}).'.format(dst_address, rs_src_address))
+ raise RuntimeError('Packet destination address ({0}) does not match '
+ 'RS source address ({1}).'.
+ format(dst_address, rs_src_address))
if ether['IPv6'].hlim != 255:
- raise RuntimeError('Hop limit not correct: {0}!=255'.format(
- ether['IPv6'].hlim))
+ raise RuntimeError('Hop limit not correct: {0}!=255'.
+ format(ether['IPv6'].hlim))
- ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code
+ ra_code = ether[ICMPv6ND_RA].code
if ra_code != 0:
raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/resources/traffic_scripts/send_tcp_for_classifier_test.py b/resources/traffic_scripts/send_tcp_for_classifier_test.py
index 5d8d387767..5a6873ab4e 100755
--- a/resources/traffic_scripts/send_tcp_for_classifier_test.py
+++ b/resources/traffic_scripts/send_tcp_for_classifier_test.py
@@ -17,27 +17,24 @@ Traffic script that sends an TCP packet
from TG to DUT.
"""
import sys
-import time
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether, Packet, Raw
+from scapy.layers.inet import IP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send TCP packet from one traffic generator interface to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
@@ -54,10 +51,9 @@ def main():
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -66,8 +62,8 @@ def main():
raise ValueError("Invalid IP version!")
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
@@ -79,10 +75,17 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(timeout)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the NSH SFC loopback packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
diff --git a/resources/traffic_scripts/send_tcp_udp.py b/resources/traffic_scripts/send_tcp_udp.py
index 77f918213f..4cba73286a 100755
--- a/resources/traffic_scripts/send_tcp_udp.py
+++ b/resources/traffic_scripts/send_tcp_udp.py
@@ -19,9 +19,9 @@ from one interface to the other.
import sys
import ipaddress
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether
+from scapy.layers.inet import IP, UDP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -62,9 +62,8 @@ def valid_ipv6(ip):
def main():
"""Send TCP or UDP packet from one traffic generator interface to the other.
"""
- args = TrafficScriptArg(
- ['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
- 'source_port', 'destination_port'])
+ args = TrafficScriptArg(['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
+ 'source_port', 'destination_port'])
src_mac = args.get_arg('tx_mac')
dst_mac = args.get_arg('rx_mac')
@@ -77,7 +76,6 @@ def main():
source_port = args.get_arg('source_port')
destination_port = args.get_arg('destination_port')
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -90,7 +88,7 @@ def main():
elif protocol.upper() == 'UDP':
protocol = UDP
else:
- raise ValueError("Invalid type of protocol!")
+ raise ValueError("Invalid protocol type!")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
@@ -100,19 +98,27 @@ def main():
protocol(sport=int(source_port), dport=int(destination_port)))
txq.send(pkt_raw)
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError("TCP/UDP Rx timeout")
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('TCP/UDP Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
- if 'TCP' in ether:
+ if TCP in ether:
print ("TCP packet received.")
- elif 'UDP' in ether:
+ elif UDP in ether:
print ("UDP packet received.")
else:
- raise RuntimeError("Not an TCP or UDP packet received {0}"
- .format(ether.__repr__()))
+ raise RuntimeError("Not an TCP or UDP packet received {0}".
+ format(ether.__repr__()))
sys.exit(0)
diff --git a/resources/traffic_scripts/send_vxlan_for_proxy_test.py b/resources/traffic_scripts/send_vxlan_for_proxy_test.py
index c356e1977e..d33ed413c8 100755
--- a/resources/traffic_scripts/send_vxlan_for_proxy_test.py
+++ b/resources/traffic_scripts/send_vxlan_for_proxy_test.py
@@ -19,24 +19,22 @@ import sys
import time
from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send VxLAN packet from TG to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
@@ -44,7 +42,7 @@ def main():
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ timeout = max(2, int(args.get_arg('timeout')))
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
@@ -53,10 +51,9 @@ def main():
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -65,17 +62,17 @@ def main():
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
vxlan = '\x08\x00\x00\x00\x00\x00\x01\x00'
raw_data = vxlan + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- UDP(sport=int(source_port), dport=4789) /
- Raw(load=raw_data))
+ ip_version(src=src_ip, dst=dst_ip) /
+ UDP(sport=int(source_port), dport=4789) /
+ Raw(load=raw_data))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
@@ -87,10 +84,17 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the proxy outbound packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
diff --git a/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py b/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py
index d998d7c6a0..3ea1f0bc62 100755
--- a/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py
+++ b/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py
@@ -16,27 +16,24 @@
from TG to DUT.
"""
import sys
-import time
from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send VxLAN-GPE+NSH packet from TG to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
@@ -44,7 +41,7 @@ def main():
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ timeout = max(2, int(args.get_arg('timeout')))
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
@@ -53,10 +50,9 @@ def main():
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -65,8 +61,8 @@ def main():
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x09\x00\x00\x06' \
'\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \
@@ -75,9 +71,9 @@ def main():
raw_data = vxlangpe_nsh + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- UDP(sport=int(source_port), dport=4790) /
- Raw(load=raw_data))
+ ip_version(src=src_ip, dst=dst_ip) /
+ UDP(sport=int(source_port), dport=4790) /
+ Raw(load=raw_data))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
@@ -89,10 +85,17 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the proxy inbound packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
diff --git a/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py b/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py
index 55c06f0691..6879d20f2c 100755
--- a/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py
+++ b/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py
@@ -16,27 +16,24 @@
from TG to DUT.
"""
import sys
-import time
from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send VxLAN-GPE+NSH packet from TG to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
@@ -44,7 +41,7 @@ def main():
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ timeout = max(2, int(args.get_arg('timeout')))
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
@@ -53,10 +50,9 @@ def main():
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@ -65,8 +61,8 @@ def main():
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x0a\x00\x00\x06' \
'\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \
@@ -75,9 +71,9 @@ def main():
raw_data = vxlangpe_nsh + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- UDP(sport=int(source_port), dport=4790) /
- Raw(load=raw_data))
+ ip_version(src=src_ip, dst=dst_ip) /
+ UDP(sport=int(source_port), dport=4790) /
+ Raw(load=raw_data))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
@@ -89,10 +85,17 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the sfc sff packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
diff --git a/resources/traffic_scripts/span_check.py b/resources/traffic_scripts/span_check.py
index c7ca8a7374..cbf65d3aaf 100755
--- a/resources/traffic_scripts/span_check.py
+++ b/resources/traffic_scripts/span_check.py
@@ -21,7 +21,7 @@ import sys
import ipaddress
from scapy.layers.inet import IP, ICMP, ARP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from scapy.layers.l2 import Ether
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
@@ -64,8 +64,8 @@ def main():
"""Send a simple L2 or ICMP packet from one TG interface to DUT, then
receive a copy of the packet on the second TG interface, and a copy of
the ICMP reply."""
- args = TrafficScriptArg(
- ['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', 'ptype'])
+ args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
+ 'ptype'])
src_mac = args.get_arg('tg_src_mac')
dst_mac = args.get_arg('dut_if1_mac')
@@ -104,11 +104,20 @@ def main():
txq.send(pkt_raw)
sent.append(auto_pad(pkt_raw))
- ether = rxq_mirrored.recv(2)
# Receive copy of Rx packet.
- if ether is None:
- raise RuntimeError("Rx timeout of mirrored Rx packet")
+ while True:
+ ether = rxq_mirrored.recv(2)
+ if ether is None:
+ raise RuntimeError("Rx timeout of mirrored Rx packet")
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
pkt = auto_pad(pkt_raw)
if str(ether) != str(pkt):
print("Mirrored Rx packet doesn't match the original Rx packet.")
@@ -148,15 +157,17 @@ def main():
# Receive reply on TG Tx port.
ether_repl = rxq_tx.recv(2, sent)
+
if ether_repl is None:
raise RuntimeError("Reply not received on TG Tx port.")
- else:
- print("Reply received on TG Tx port.\n")
+
+ print("Reply received on TG Tx port.\n")
# Receive copy of Tx packet.
ether = rxq_mirrored.recv(2)
if ether is None:
raise RuntimeError("Rx timeout of mirrored Tx packet")
+
if str(ether) != str(ether_repl):
print("Mirrored Tx packet doesn't match the received Tx packet.")
if ether.src != ether_repl.src or ether.dst != ether_repl.dst: