diff options
author | Jan Gelety <jgelety@cisco.com> | 2017-09-08 11:38:38 +0200 |
---|---|---|
committer | Tibor Frank <tifrank@cisco.com> | 2017-09-18 12:05:49 +0000 |
commit | 2a848f49308868dfe6fa3a9cb78bd085f8c16f40 (patch) | |
tree | 180c45ea5db2cc095c65d3b698a3e05a6ee819fe | |
parent | 6928a6be42016a6c5edade6369041670fe544f39 (diff) |
Ignore unexpected ICMPv6 Neighbor Discovery - Neighbor Solicitation packets
We need to adapt all functional traffic scripts related to functional
IPv6 tests to ingore receiving of unexpected ICMPv6ND_NS
(ICMPv6 Neighbor Discovery - Neighbor Solicitation) packets that are
sent automatically and we cannot avoid to receive them.
The reason is to prevent false negative test results in case of csit
functional tests that could block creation of new operational branch
(csit weekly jobs), usage of new vpp builds (csit semiweekly jobs)
and merging patches - csit as well as vpp.
Change-Id: I43c90e7c766762fa769a81661338759a11b401a1
Signed-off-by: Jan Gelety <jgelety@cisco.com>
22 files changed, 631 insertions, 424 deletions
diff --git a/resources/traffic_scripts/check_ra_packet.py b/resources/traffic_scripts/check_ra_packet.py index 231e07da11..9717c7db95 100755 --- a/resources/traffic_scripts/check_ra_packet.py +++ b/resources/traffic_scripts/check_ra_packet.py @@ -17,6 +17,8 @@ import sys import ipaddress +from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_NS + from resources.libraries.python.PacketVerifier import RxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -56,15 +58,23 @@ def main(): interval = int(args.get_arg('interval')) rxq = RxQueue(rx_if) - ether = rxq.recv(max(5, interval)) + # receive ICMPv6ND_RA packet + while True: + ether = rxq.recv(max(5, interval)) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') - # Check whether received packet contains layer RA and check other values - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break - if not ether.haslayer('ICMPv6ND_RA'): - raise RuntimeError('Not an RA packet received {0}' - .format(ether.__repr__())) + # Check if received packet contains layer RA and check other values + if not ether.haslayer(ICMPv6ND_RA): + raise RuntimeError('Not an RA packet received {0}'. + format(ether.__repr__())) src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src)) dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst)) @@ -72,22 +82,22 @@ def main(): all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1') if src_address != link_local: - raise RuntimeError( - 'Source address ({0}) not matching link local address({1})'.format( - src_address, link_local)) + raise RuntimeError('Source address ({0}) not matching link local ' + 'address ({1})'.format(src_address, link_local)) if dst_address != all_nodes_multicast: - raise RuntimeError('Packet destination address ({0}) is not the all ' - 'nodes multicast address ({1}).'.format( - dst_address, all_nodes_multicast)) - if ether['IPv6'].hlim != 255: - raise RuntimeError('Hop limit not correct: {0}!=255'.format( - ether['IPv6'].hlim)) - - ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code + raise RuntimeError('Packet destination address ({0}) is not the all' + ' nodes multicast address ({1}).'. + format(dst_address, all_nodes_multicast)) + if ether[IPv6].hlim != 255: + raise RuntimeError('Hop limit not correct: {0}!=255'. + format(ether[IPv6].hlim)) + + ra_code = ether[ICMPv6ND_RA].code if ra_code != 0: raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/icmpv6_echo.py b/resources/traffic_scripts/icmpv6_echo.py index 4bf573a1f1..a18896b07f 100755 --- a/resources/traffic_scripts/icmpv6_echo.py +++ b/resources/traffic_scripts/icmpv6_echo.py @@ -17,13 +17,19 @@ import sys import logging + +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + from scapy.all import Ether +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def main(): @@ -43,52 +49,60 @@ def main(): # send ICMPv6 neighbor advertisement message pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=src_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=src_mac)) + IPv6(src=src_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NA(tgt=src_ip, R=0) / + ICMPv6NDOptDstLLAddr(lladdr=src_mac)) sent_packets.append(pkt_send) txq.send(pkt_send) # send ICMPv6 echo request pkt_send = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) + IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) sent_packets.append(pkt_send) txq.send(pkt_send) # receive ICMPv6 echo reply - ether = rxq.recv(2, sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') + while True: + ether = rxq.recv(2, sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 received {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no ICMPv6 echo reply ' + 'received {0}'.format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Reply'] + icmpv6 = ipv6[ICMPv6EchoReply] # check identifier and sequence number if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \ - 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) + raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} seq {1} ' + 'should be ID {2} seq {3}'. + format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) # verify checksum cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoReply(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/icmpv6_echo_req_resp.py b/resources/traffic_scripts/icmpv6_echo_req_resp.py index c2cc4d20a0..ec9cf94a67 100755 --- a/resources/traffic_scripts/icmpv6_echo_req_resp.py +++ b/resources/traffic_scripts/icmpv6_echo_req_resp.py @@ -18,14 +18,20 @@ import sys import logging + +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr + +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply from scapy.all import Ether +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg + def main(): args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac', @@ -52,109 +58,122 @@ def main(): # send ICMPv6 neighbor advertisement message pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=src_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=src_mac)) + IPv6(src=src_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NA(tgt=src_ip, R=0) / + ICMPv6NDOptDstLLAddr(lladdr=src_mac)) src_sent_packets.append(pkt_send) src_txq.send(pkt_send) pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=dst_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=dst_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=dst_mac)) + IPv6(src=dst_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NA(tgt=dst_ip, R=0) / + ICMPv6NDOptDstLLAddr(lladdr=dst_mac)) dst_sent_packets.append(pkt_send) dst_txq.send(pkt_send) # send ICMPv6 echo request from first TG interface pkt_send = (Ether(src=src_mac, dst=src_nh_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) / - ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) + IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) / + ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) src_sent_packets.append(pkt_send) src_txq.send(pkt_send) # receive ICMPv6 echo request on second TG interface - ether = dst_rxq.recv(2, dst_sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') + while True: + ether = dst_rxq.recv(2, dst_sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 received: {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] # verify hop limit processing if ipv6.hlim != (hop_limit - hop_num): - raise RuntimeError( - 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim, - hop_limit - hop_num)) + raise RuntimeError('Invalid hop limit {0} should be {1}'. + format(ipv6.hlim,hop_limit - hop_num)) if not ipv6.haslayer(ICMPv6EchoRequest): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'. + format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Request'] + icmpv6 = ipv6[ICMPv6EchoRequest] # check identifier and sequence number if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \ - 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) + raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} ' + 'seq {1} should be ID {2} seq {3}'. + format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) # verify checksum cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoRequest(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) # send ICMPv6 echo reply from second TG interface pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) / - IPv6(src=dst_ip, dst=src_ip) / - ICMPv6EchoReply(id=echo_id, seq=echo_seq)) + IPv6(src=dst_ip, dst=src_ip) / + ICMPv6EchoReply(id=echo_id, seq=echo_seq)) dst_sent_packets.append(pkt_send) dst_txq.send(pkt_send) # receive ICMPv6 echo reply on first TG interface - ether = src_rxq.recv(2, src_sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') + while True: + ether = src_rxq.recv(2, src_sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] # verify hop limit processing if ipv6.hlim != (hop_limit - hop_num): - raise RuntimeError( - 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim, - hop_limit - hop_num)) + raise RuntimeError('Invalid hop limit {0} should be {1}'. + format(ipv6.hlim, hop_limit - hop_num)) if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'. + format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Reply'] + icmpv6 = ipv6[ICMPv6EchoReply] # check identifier and sequence number if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \ - 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) + raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} ' + 'seq {1} should be ID {2} seq {3}'. + format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) # verify checksum cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoReply(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipfix_check.py b/resources/traffic_scripts/ipfix_check.py index 2a08f0ce85..f5693cc7e8 100755 --- a/resources/traffic_scripts/ipfix_check.py +++ b/resources/traffic_scripts/ipfix_check.py @@ -24,8 +24,8 @@ from scapy.layers.l2 import Ether from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler, \ - IPFIXData +from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler +from resources.libraries.python.telemetry.IPFIXUtil import IPFIXData def valid_ipv4(ip): @@ -123,6 +123,11 @@ def main(): pkt = rxq.recv(10, ignore=ignore, verbose=verbose) if pkt is None: raise RuntimeError("RX timeout") + + if pkt.haslayer("ICMPv6ND_NS"): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + if pkt.haslayer("IPFIXHeader"): if pkt.haslayer("IPFIXTemplate"): # create or update template for IPFIX data packets @@ -138,9 +143,9 @@ def main(): # verify packet count if data["packetTotalCount"] != count: - raise RuntimeError( - "IPFIX reported wrong packet count. Count was {0}," - " but should be {1}".format(data["packetTotalCount"], count)) + raise RuntimeError("IPFIX reported wrong packet count. Count was {0}," + "but should be {1}".format(data["packetTotalCount"], + count)) # verify IP addresses keys = data.keys() err = "{0} mismatch. Packets used {1}, but were classified as {2}." diff --git a/resources/traffic_scripts/ipfix_sessions.py b/resources/traffic_scripts/ipfix_sessions.py index e7597a894a..11e77fa08c 100755 --- a/resources/traffic_scripts/ipfix_sessions.py +++ b/resources/traffic_scripts/ipfix_sessions.py @@ -192,6 +192,11 @@ def main(): pkt = rxq.recv(5) if pkt is None: raise RuntimeError("RX timeout") + + if pkt.haslayer("ICMPv6ND_NS"): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + if pkt.haslayer("IPFIXHeader"): if pkt.haslayer("IPFIXTemplate"): # create or update template for IPFIX data packets @@ -219,6 +224,6 @@ def main(): raise RuntimeError("Received non-IPFIX packet or IPFIX header was" "not recognized.") -if __name__ == "__main__": +if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipsec.py b/resources/traffic_scripts/ipsec.py index 13d44b8a51..1561738c60 100755 --- a/resources/traffic_scripts/ipsec.py +++ b/resources/traffic_scripts/ipsec.py @@ -18,8 +18,14 @@ import sys import logging +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from scapy.all import Ether, IP, ICMP, IPv6, ICMPv6EchoRequest, ICMPv6EchoReply + +from scapy.all import Ether +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply from scapy.layers.ipsec import SecurityAssociation, ESP from ipaddress import ip_address @@ -39,7 +45,7 @@ def check_ipv4(pkt_recv, dst_tun, src_ip, dst_ip, sa_in): :type dst_tun: str :type src_ip: str :type dst_ip: str - :type sa_sa: scapy.layers.ipsec.SecurityAssociation + :type sa_in: scapy.layers.ipsec.SecurityAssociation :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(IP): @@ -55,15 +61,15 @@ def check_ipv4(pkt_recv, dst_tun, src_ip, dst_ip, sa_in): raise RuntimeError( 'Not an ESP packet received: {0}'.format(pkt_recv.__repr__())) - ip_pkt = pkt_recv['IP'] + ip_pkt = pkt_recv[IP] d_pkt = sa_in.decrypt(ip_pkt) - if d_pkt['IP'].dst != dst_ip: + if d_pkt[IP].dst != dst_ip: raise RuntimeError( 'Decrypted packet has invalid destination address: {0} ' 'should be: {1}'.format(d_pkt['IP'].dst, dst_ip)) - if d_pkt['IP'].src != src_ip: + if d_pkt[IP].src != src_ip: raise RuntimeError( 'Decrypted packet has invalid source address: {0} should be: {1}' .format(d_pkt['IP'].src, src_ip)) @@ -93,7 +99,7 @@ def check_ipv6(pkt_recv, dst_tun, src_ip, dst_ip, sa_in): raise RuntimeError( 'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__())) - if pkt_recv['IPv6'].dst != dst_tun: + if pkt_recv[IPv6].dst != dst_tun: raise RuntimeError( 'Received packet has invalid destination address: {0} ' 'should be: {1}'.format(pkt_recv['IPv6'].dst, dst_tun)) @@ -102,15 +108,15 @@ def check_ipv6(pkt_recv, dst_tun, src_ip, dst_ip, sa_in): raise RuntimeError( 'Not an ESP packet received: {0}'.format(pkt_recv.__repr__())) - ip_pkt = pkt_recv['IPv6'] + ip_pkt = pkt_recv[IPv6] d_pkt = sa_in.decrypt(ip_pkt) - if d_pkt['IPv6'].dst != dst_ip: + if d_pkt[IPv6].dst != dst_ip: raise RuntimeError( 'Decrypted packet has invalid destination address {0}: ' 'should be: {1}'.format(d_pkt['IPv6'].dst, dst_ip)) - if d_pkt['IPv6'].src != src_ip: + if d_pkt[IPv6].src != src_ip: raise RuntimeError( 'Decrypted packet has invalid source address: {0} should be: {1}' .format(d_pkt['IPv6'].src, src_ip)) @@ -175,25 +181,33 @@ def main(): sent_packets = [] if is_ipv4: - ip_pkt = IP(src=src_ip, dst=dst_ip) / \ - ICMP() + ip_pkt = (IP(src=src_ip, dst=dst_ip) / + ICMP()) ip_pkt = IP(str(ip_pkt)) else: - ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \ - ICMPv6EchoRequest() + ip_pkt = (IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest()) ip_pkt = IPv6(str(ip_pkt)) e_pkt = sa_out.encrypt(ip_pkt) - pkt_send = Ether(src=src_mac, dst=dst_mac) / \ - e_pkt + pkt_send = (Ether(src=src_mac, dst=dst_mac) / + e_pkt) sent_packets.append(pkt_send) txq.send(pkt_send) - pkt_recv = rxq.recv(2, sent_packets) + while True: + pkt_recv = rxq.recv(2, sent_packets) - if pkt_recv is None: - raise RuntimeError('ESP packet Rx timeout') + if pkt_recv is None: + raise RuntimeError('ESP packet Rx timeout') + + if pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if is_ipv4: check_ipv4(pkt_recv, src_tun, dst_ip, src_ip, sa_in) @@ -202,5 +216,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py index b1028028b0..c9213999ec 100755 --- a/resources/traffic_scripts/ipv6_nd_proxy_check.py +++ b/resources/traffic_scripts/ipv6_nd_proxy_check.py @@ -15,8 +15,9 @@ """Traffic script that sends DHCPv6 proxy packets.""" from scapy.layers.inet import Ether -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\ - ICMPv6ND_NS +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -44,17 +45,23 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): sent_packets = [] - icmpv6nd_solicit_pkt =\ - Ether(src=src_mac, dst=dst_mac) / \ - IPv6(src=src_ip) / \ - ICMPv6ND_NS(tgt=dst_ip) + icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) / + IPv6(src=src_ip) / + ICMPv6ND_NS(tgt=dst_ip)) sent_packets.append(icmpv6nd_solicit_pkt) txq.send(icmpv6nd_solicit_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3, ignore=sent_packets) + while True: + pkt = rxq.recv(3, ignore=sent_packets) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -63,34 +70,33 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): raise RuntimeError('ICMPv6ND Proxy response timeout.') if ether.src != dst_mac: - raise RuntimeError("Source MAC address error: {} != {}".format( - ether.src, dst_mac)) + raise RuntimeError("Source MAC address error: {} != {}". + format(ether.src, dst_mac)) print "Source MAC address: OK." if ether.dst != src_mac: - raise RuntimeError("Destination MAC address error: {} != {}".format( - ether.dst, src_mac)) + raise RuntimeError("Destination MAC address error: {} != {}". + format(ether.dst, src_mac)) print "Destination MAC address: OK." - if ether['IPv6'].src != dst_ip: - raise RuntimeError("Source IP address error: {} != {}".format( - ether['IPv6'].src, dst_ip)) + if ether[IPv6].src != dst_ip: + raise RuntimeError("Source IP address error: {} != {}". + format(ether[IPv6].src, dst_ip)) print "Source IP address: OK." - if ether['IPv6'].dst != src_ip: - raise RuntimeError("Destination IP address error: {} != {}".format( - ether['IPv6'].dst, src_ip)) + if ether[IPv6].dst != src_ip: + raise RuntimeError("Destination IP address error: {} != {}". + format(ether[IPv6].dst, src_ip)) print "Destination IP address: OK." try: - target_addr = ether['IPv6']\ - ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt + target_addr = ether[IPv6][ICMPv6ND_NA].tgt except (KeyError, AttributeError): raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.") if target_addr != dst_ip: - raise RuntimeError("ICMPv6 field 'Target address' error:" - " {} != {}".format(target_addr, dst_ip)) + raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}". + format(target_addr, dst_ip)) print "Target address field: OK." @@ -119,16 +125,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, rxq = RxQueue(dst_if) txq = TxQueue(src_if) - icmpv6_ping_pkt = \ - Ether(src=src_mac, dst=proxy_to_src_mac) / \ - IPv6(src=src_ip, dst=dst_ip) / \ - ICMPv6EchoRequest() + icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) / + IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest()) txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3) + while True: + pkt = rxq.recv(3) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -136,7 +148,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, if ether is None: raise RuntimeError('ICMPv6 Echo Request timeout.') try: - ether["IPv6"]["ICMPv6 Echo Request"] + ether[IPv6]["ICMPv6 Echo Request"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Request.") print "ICMP Echo: OK." @@ -144,16 +156,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, rxq = RxQueue(src_if) txq = TxQueue(dst_if) - icmpv6_ping_pkt = \ - Ether(src=dst_mac, dst=proxy_to_dst_mac) / \ - IPv6(src=dst_ip, dst=src_ip) / \ - ICMPv6EchoReply() + icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) / + IPv6(src=dst_ip, dst=src_ip) / + ICMPv6EchoReply()) txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3) + while True: + pkt = rxq.recv(3) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -161,7 +179,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, if ether is None: raise RuntimeError('DHCPv6 SOLICIT timeout') try: - ether["IPv6"]["ICMPv6 Echo Reply"] + ether[IPv6]["ICMPv6 Echo Reply"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.") @@ -187,8 +205,9 @@ def main(): imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip) # Verify route (ICMP echo/reply) - ipv6_ping(src_if, dst_if, src_mac, dst_mac, - proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip) + ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac, + proxy_to_dst_mac, src_ip, dst_ip) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipv6_ns.py b/resources/traffic_scripts/ipv6_ns.py index 70c6ab445a..5852e6317f 100755 --- a/resources/traffic_scripts/ipv6_ns.py +++ b/resources/traffic_scripts/ipv6_ns.py @@ -17,13 +17,18 @@ import sys import logging + +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg + +from scapy.all import Ether from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr -from scapy.all import Ether + +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def main(): @@ -41,57 +46,67 @@ def main(): # send ICMPv6 neighbor solicitation message pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NS(tgt=dst_ip) / - ICMPv6NDOptSrcLLAddr(lladdr=src_mac)) + IPv6(src=src_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NS(tgt=dst_ip) / + ICMPv6NDOptSrcLLAddr(lladdr=src_mac)) sent_packets.append(pkt_send) txq.send(pkt_send) # receive ICMPv6 neighbor advertisement message - ether = rxq.recv(2, sent_packets) + while True: + ether = rxq.recv(2, sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + if ether is None: raise RuntimeError('ICMPv6 echo reply Rx timeout') if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 received {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] if not ipv6.haslayer(ICMPv6ND_NA): - raise RuntimeError( - 'Unexpected packet with no ICMPv6 ND-NA received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received ' + '{0}'.format(ipv6.__repr__())) - icmpv6_na = ipv6['ICMPv6 Neighbor Discovery - Neighbor Advertisement'] + icmpv6_na = ipv6[ICMPv6ND_NA] # verify target address if icmpv6_na.tgt != dst_ip: - raise RuntimeError('Invalid target address {0} should be {1}'.format( - icmpv6_na.tgt, dst_ip)) + raise RuntimeError('Invalid target address {0} should be {1}'. + format(icmpv6_na.tgt, dst_ip)) if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr): - raise RuntimeError( - 'Missing Destination Link-Layer Address option in ICMPv6 ' + - 'Neighbor Advertisement {0}'.format(icmpv6_na.__repr__())) + raise RuntimeError('Missing Destination Link-Layer Address option in ' + 'ICMPv6 Neighbor Advertisement {0}'. + format(icmpv6_na.__repr__())) - option = 'ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address' - dst_ll_addr = icmpv6_na[option] + dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr] # verify destination link-layer address field if dst_ll_addr.lladdr != dst_mac: - raise RuntimeError('Invalid lladdr {0} should be {1}'.format( - dst_ll_addr.lladdr, dst_mac)) + raise RuntimeError('Invalid lladdr {0} should be {1}'. + format(dst_ll_addr.lladdr, dst_mac)) # verify checksum cksum = icmpv6_na.cksum del icmpv6_na.cksum tmp = ICMPv6ND_NA(str(icmpv6_na)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipv6_sweep_ping.py b/resources/traffic_scripts/ipv6_sweep_ping.py index 80fdda532a..28d23a16ef 100755 --- a/resources/traffic_scripts/ipv6_sweep_ping.py +++ b/resources/traffic_scripts/ipv6_sweep_ping.py @@ -19,13 +19,18 @@ import logging import os import sys +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, \ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + from scapy.all import Ether +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def main(): @@ -68,37 +73,43 @@ def main(): sent_packets.append(pkt_send) txq.send(pkt_send) - ether = rxq.recv(ignore=sent_packets) - if ether is None: - raise RuntimeError( - 'ICMPv6 echo reply seq {0} Rx timeout'.format(echo_seq)) + while True: + ether = rxq.recv(ignore=sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply seq {0} ' + 'Rx timeout'.format(echo_seq)) + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError( - 'Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 layer ' + 'received: {0}'.format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no ICMPv6 echo reply ' + 'layer received: {0}'.format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Reply'] + icmpv6 = ipv6[ICMPv6EchoReply] if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' - 'ID {2} seq {3}, {0}'.format(icmpv6.id, icmpv6.seq, echo_id, - echo_seq)) + raise RuntimeError('ICMPv6 echo reply with invalid data received: ' + 'ID {0} seq {1} should be ID {2} seq {3}, {0}'. + format(icmpv6.id, icmpv6.seq, echo_id, + echo_seq)) cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoReply(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum: {0} should be {1}'. + format(cksum, tmp.cksum)) sent_packets.remove(pkt_send) diff --git a/resources/traffic_scripts/policer.py b/resources/traffic_scripts/policer.py index ee9fa29c4a..62c397d496 100755 --- a/resources/traffic_scripts/policer.py +++ b/resources/traffic_scripts/policer.py @@ -21,7 +21,10 @@ import logging # pylint: disable=no-name-in-module # pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from scapy.all import Ether, IP, IPv6, TCP + +from scapy.all import Ether +from scapy.layers.inet import IP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from ipaddress import ip_address from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -38,17 +41,17 @@ def check_ipv4(pkt_recv, dscp): :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(IP): - raise RuntimeError( - 'Not an IPv4 packet received: {0}'.format(pkt_recv.__repr__())) + raise RuntimeError('Not an IPv4 packet received: {0}'. + format(pkt_recv.__repr__())) - rx_dscp = pkt_recv['IP'].tos >> 2 + rx_dscp = pkt_recv[IP].tos >> 2 if rx_dscp != dscp: - raise RuntimeError( - 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp)) + raise RuntimeError('Invalid DSCP {0} should be {1}'. + format(rx_dscp, dscp)) if not pkt_recv.haslayer(TCP): - raise RuntimeError( - 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__())) + raise RuntimeError('Not a TCP packet received: {0}'. + format(pkt_recv.__repr__())) def check_ipv6(pkt_recv, dscp): @@ -61,17 +64,17 @@ def check_ipv6(pkt_recv, dscp): :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(IPv6): - raise RuntimeError( - 'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__())) + raise RuntimeError('Not an IPv6 packet received: {0}'. + format(pkt_recv.__repr__())) - rx_dscp = pkt_recv['IPv6'].tc >> 2 + rx_dscp = pkt_recv[IPv6].tc >> 2 if rx_dscp != dscp: - raise RuntimeError( - 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp)) + raise RuntimeError('Invalid DSCP {0} should be {1}'. + format(rx_dscp, dscp)) if not pkt_recv.haslayer(TCP): - raise RuntimeError( - 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__())) + raise RuntimeError('Not a TCP packet received: {0}'. + format(pkt_recv.__repr__())) # pylint: disable=too-many-locals @@ -97,19 +100,29 @@ def main(): sent_packets = [] if is_ipv4: - ip_pkt = IP(src=src_ip, dst=dst_ip) / \ - TCP() + ip_pkt = (IP(src=src_ip, dst=dst_ip) / + TCP()) else: - ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \ - TCP() + ip_pkt = (IPv6(src=src_ip, dst=dst_ip) / + TCP()) - pkt_send = Ether(src=src_mac, dst=dst_mac) / \ - ip_pkt + pkt_send = (Ether(src=src_mac, dst=dst_mac) / + ip_pkt) sent_packets.append(pkt_send) txq.send(pkt_send) - pkt_recv = rxq.recv(2, sent_packets) + while True: + pkt_recv = rxq.recv(2, sent_packets) + if pkt_recv is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if pkt_recv is None: raise RuntimeError('Rx timeout') @@ -121,5 +134,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_icmp_check_headers.py b/resources/traffic_scripts/send_icmp_check_headers.py index c9e3a35df4..2193164b61 100755 --- a/resources/traffic_scripts/send_icmp_check_headers.py +++ b/resources/traffic_scripts/send_icmp_check_headers.py @@ -22,8 +22,7 @@ import sys import ipaddress from robot.api import logger from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import ICMPv6EchoRequest -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from scapy.layers.l2 import Ether, Dot1Q from resources.libraries.python.PacketVerifier import RxQueue, TxQueue @@ -94,28 +93,36 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - if tx_if == rx_if: - ether = rxq.recv(2, ignore=sent_packets) - else: - ether = rxq.recv(2) - if ether is None: - raise RuntimeError("ICMP echo Rx timeout") + while True: + if tx_if == rx_if: + ether = rxq.recv(2, ignore=sent_packets) + else: + ether = rxq.recv(2) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: logger.trace("MAC matched") else: - raise RuntimeError( - "Matching packet unsuccessful: {0}".format(ether.__repr__())) + raise RuntimeError("Matching packet unsuccessful: {0}". + format(ether.__repr__())) if encaps_rx == 'Dot1q': if ether[Dot1Q].vlan == int(vlan_rx): logger.trace("VLAN matched") else: raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-' - 'received, {}-expected):\n{}' - .format(ether[Dot1Q].vlan, vlan_rx, - ether.__repr__())) + 'received, {}-expected):\n{}'. + format(ether[Dot1Q].vlan, vlan_rx, + ether.__repr__())) ip = ether[Dot1Q].payload elif encaps_rx == 'Dot1ad': raise NotImplementedError() @@ -123,21 +130,21 @@ def main(): ip = ether.payload if not isinstance(ip, ip_format): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError("Not an IP packet received {0}". + format(ip.__repr__())) # Compare data from packets if src_ip == ip.src: logger.trace("Src IP matched") else: - raise RuntimeError("Matching Src IP unsuccessful: {} != {}" - .format(src_ip, ip.src)) + raise RuntimeError("Matching Src IP unsuccessful: {} != {}". + format(src_ip, ip.src)) if dst_ip == ip.dst: logger.trace("Dst IP matched") else: - raise RuntimeError("Matching Dst IP unsuccessful: {} != {}" - .format(dst_ip, ip.dst)) + raise RuntimeError("Matching Dst IP unsuccessful: {} != {}". + format(dst_ip, ip.dst)) sys.exit(0) diff --git a/resources/traffic_scripts/send_icmp_check_multipath.py b/resources/traffic_scripts/send_icmp_check_multipath.py index 4e39efdfcd..5da3b7b096 100755 --- a/resources/traffic_scripts/send_icmp_check_multipath.py +++ b/resources/traffic_scripts/send_icmp_check_multipath.py @@ -18,10 +18,9 @@ and check if it is divided into two paths.""" import sys import ipaddress -from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6 from scapy.all import Ether -from scapy.layers.inet6 import ICMPv6EchoRequest +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -94,27 +93,38 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) + + while True: + ether = rxq.recv(2) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if ether is None: raise RuntimeError("ICMP echo Rx timeout") if not ether.haslayer(ip_format): - raise RuntimeError("Not an IP packet received {0}" - .format(ether.__repr__())) + raise RuntimeError("Not an IP packet received {0}". + format(ether.__repr__())) - if ether['Ethernet'].src != dut_if2_mac: + if ether[Ether].src != dut_if2_mac: raise RuntimeError("Source MAC address error") - if ether['Ethernet'].dst == path_1_mac: + if ether[Ether].dst == path_1_mac: path_1_counter += 1 - elif ether['Ethernet'].dst == path_2_mac: + elif ether[Ether].dst == path_2_mac: path_2_counter += 1 else: raise RuntimeError("Destination MAC address error") if (path_1_counter + path_2_counter) != 100: - raise RuntimeError("Packet loss: recevied only {} packets of 100 " - .format(path_1_counter + path_2_counter)) + raise RuntimeError("Packet loss: recevied only {} packets of 100 ". + format(path_1_counter + path_2_counter)) if path_1_counter == 0: raise RuntimeError("Path 1 error!") @@ -127,5 +137,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_icmp_type_code.py b/resources/traffic_scripts/send_icmp_type_code.py index dffaafff38..2997f91264 100755 --- a/resources/traffic_scripts/send_icmp_type_code.py +++ b/resources/traffic_scripts/send_icmp_type_code.py @@ -19,10 +19,9 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. import sys import ipaddress -from scapy.layers.inet import ICMP, IP from scapy.layers.l2 import Ether -from scapy.layers.inet6 import ICMPv6EchoRequest -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -88,13 +87,13 @@ def main(): pkt_raw = (Ether(src=src_mac, dst=dst_mac) / IP(src=src_ip, dst=dst_ip) / ICMP(code=icmp_code, type=icmp_type)) - ip_format = 'IP' - icmp_format = 'ICMP' + ip_format = IP + icmp_format = ICMP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): pkt_raw = (Ether(src=src_mac, dst=dst_mac) / IPv6(src=src_ip, dst=dst_ip) / ICMPv6EchoRequest(code=icmp_code, type=icmp_type)) - ip_format = 'IPv6' + ip_format = IPv6 icmp_format = 'ICMPv6' else: raise ValueError("IP(s) not in correct format") @@ -103,23 +102,31 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) + while True: + ether = rxq.recv(2) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') - # Check whether received packet contains layers Ether, IP and ICMP - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + # Check whether received packet contains layers IP and ICMP if not ether.haslayer(ip_format): - raise RuntimeError('Not an IP packet received {0}' - .format(ether.__repr__())) + raise RuntimeError('Not an IP packet received {0}'. + format(ether.__repr__())) # Cannot use haslayer for ICMPv6, every type of ICMPv6 is a separate layer # Next header value of 58 means the next header is ICMPv6 if not ether.haslayer(icmp_format) and ether[ip_format].nh != 58: - raise RuntimeError('Not an ICMP packet received {0}' - .format(ether.__repr__())) + raise RuntimeError('Not an ICMP packet received {0}'. + format(ether.__repr__())) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_icmp_wait_for_reply.py b/resources/traffic_scripts/send_icmp_wait_for_reply.py index a77f15efc2..7677152801 100755 --- a/resources/traffic_scripts/send_icmp_wait_for_reply.py +++ b/resources/traffic_scripts/send_icmp_wait_for_reply.py @@ -17,9 +17,9 @@ import sys import ipaddress -from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from scapy.all import Ether +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -35,14 +35,14 @@ def is_icmp_reply(pkt, ipformat): :type ipformat: dict :rtype: bool """ - #pylint: disable=bare-except + # pylint: disable=bare-except try: if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \ ipformat['Type']: return True else: return False - except: + except: # pylint: disable=bare-except return False @@ -58,12 +58,12 @@ def address_check(request, reply, ipformat): :type ipformat: dict :rtype: bool """ - #pylint: disable=bare-except + # pylint: disable=bare-except try: r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src return r_src and r_dst - except: + except: # pylint: disable=bare-except return False @@ -139,12 +139,21 @@ def main(): txq.send(icmp_request) for _ in range(1000): - icmp_reply = rxq.recv(wait_step, ignore=sent_packets) - if icmp_reply is None: - timeout -= wait_step - if timeout < 0: - raise RuntimeError("ICMP echo Rx timeout") - elif is_icmp_reply(icmp_reply, ip_format): + while True: + icmp_reply = rxq.recv(wait_step, ignore=sent_packets) + if icmp_reply is None: + timeout -= wait_step + if timeout < 0: + raise RuntimeError("ICMP echo Rx timeout") + + elif icmp_reply.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break + + if is_icmp_reply(icmp_reply, ip_format): if address_check(icmp_request, icmp_reply, ip_format): break else: @@ -154,5 +163,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_ip_icmp.py b/resources/traffic_scripts/send_ip_icmp.py index b22b5d39a8..58f2e1e4d8 100755 --- a/resources/traffic_scripts/send_ip_icmp.py +++ b/resources/traffic_scripts/send_ip_icmp.py @@ -19,11 +19,9 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. import sys import ipaddress +from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet import ICMP, IP -from scapy.layers.l2 import Ether -from scapy.layers.l2 import Dot1Q -from scapy.layers.inet6 import ICMPv6EchoRequest -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -107,8 +105,8 @@ def main(): pkt_raw = (Ether(src=src_mac, dst=dst_mac) / IP(src=src_ip, dst=dst_ip) / ICMP()) - ip_format = 'IP' - icmp_format = 'ICMP' + ip_format = IP + icmp_format = ICMP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): if encaps == 'Dot1q': pkt_raw = (Ether(src=src_mac, dst=dst_mac) / @@ -125,8 +123,8 @@ def main(): pkt_raw = (Ether(src=src_mac, dst=dst_mac) / IPv6(src=src_ip, dst=dst_ip) / ICMPv6EchoRequest()) - ip_format = 'IPv6' - icmp_format = 'ICMPv6EchoRequest' + ip_format = IPv6 + icmp_format = ICMPv6EchoRequest else: raise ValueError("IP(s) not in correct format") @@ -134,44 +132,55 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) + # Receive ICMP / ICMPv6 echo reply + while True: + ether = rxq.recv(2,) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') - # Check whether received packet contains layers Ether, IP and ICMP - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + # Check whether received packet contains layers IP/IPv6 and + # ICMP/ICMPv6EchoRequest if encaps_rx: if encaps_rx == 'Dot1q': if not vlan1_rx: vlan1_rx = vlan1 if not ether.haslayer(Dot1Q): - raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}' - .format(ether.__repr__())) + raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'. + format(ether.__repr__())) elif ether[Dot1Q].vlan != int(vlan1_rx): raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) ' - 'received ({} expected):\n{}'.format( - ether[Dot1Q].vlan, vlan1_rx, ether.__repr__())) + 'received ({} expected):\n{}'. + format(ether[Dot1Q].vlan, vlan1_rx, + ether.__repr__())) elif encaps_rx == 'Dot1ad': if not vlan1_rx: vlan1_rx = vlan1 if not vlan2_rx: vlan2_rx = vlan2 # TODO - raise RuntimeError('Encapsulation {0} not implemented yet.' - .format(encaps_rx)) + raise RuntimeError('Encapsulation {0} not implemented yet.'. + format(encaps_rx)) else: - raise RuntimeError('Unsupported/unknown encapsulation expected: {0}' - .format(encaps_rx)) + raise RuntimeError('Unsupported encapsulation expected: {0}'. + format(encaps_rx)) if not ether.haslayer(ip_format): - raise RuntimeError('Not an IP packet received:\n{0}' - .format(ether.__repr__())) + raise RuntimeError('Not an IP/IPv6 packet received:\n{0}'. + format(ether.__repr__())) if not ether.haslayer(icmp_format): - raise RuntimeError('Not an ICMP packet received:\n{0}' - .format(ether.__repr__())) + raise RuntimeError('Not an ICMP/ICMPv6EchoRequest packet received:\n' + '{0}'.format(ether.__repr__())) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_rs_check_ra.py b/resources/traffic_scripts/send_rs_check_ra.py index 9ba1f55b52..c9ff46528a 100755 --- a/resources/traffic_scripts/send_rs_check_ra.py +++ b/resources/traffic_scripts/send_rs_check_ra.py @@ -18,7 +18,7 @@ import sys import ipaddress from scapy.layers.l2 import Ether -from scapy.layers.inet6 import IPv6, ICMPv6ND_RS +from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_RS, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -72,24 +72,29 @@ def main(): sent_packets = [pkt_raw] txq.send(pkt_raw) - ether = rxq.recv(8, ignore=sent_packets) + while True: + ether = rxq.recv(2, sent_packets) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') - # Check whether received packet contains layer RA and check other values - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + # Check whether received packet contains layer RA and check other values if ether.src != router_mac: - raise RuntimeError( - 'Packet source MAC ({0}) does not match ' - 'router MAC ({1}).'.format(ether.src, router_mac)) + raise RuntimeError('Packet source MAC ({0}) does not match router MAC ' + '({1}).'.format(ether.src, router_mac)) if ether.dst != src_mac: - raise RuntimeError( - 'Packet destination MAC ({0}) does not match ' - 'RS source MAC ({1}).'.format(ether.dst, src_mac)) + raise RuntimeError('Packet destination MAC ({0}) does not match RS ' + 'source MAC ({1}).'.format(ether.dst, src_mac)) - if not ether.haslayer('ICMPv6ND_RA'): - raise RuntimeError('Not an RA packet received {0}' - .format(ether.__repr__())) + if not ether.haslayer(ICMPv6ND_RA): + raise RuntimeError('Not an RA packet received {0}'. + format(ether.__repr__())) src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src)) dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst)) @@ -98,24 +103,25 @@ def main(): rs_src_address = ipaddress.IPv6Address(unicode(src_ip)) if src_address != router_link_local: - raise RuntimeError( - 'Packet source address ({0}) does not match ' - 'link local address({1})'.format(src_address, router_link_local)) + raise RuntimeError('Packet source address ({0}) does not match link ' + 'local address({1})'. + format(src_address, router_link_local)) if dst_address != rs_src_address: - raise RuntimeError( - 'Packet destination address ({0}) does not match ' - 'RS source address ({1}).'.format(dst_address, rs_src_address)) + raise RuntimeError('Packet destination address ({0}) does not match ' + 'RS source address ({1}).'. + format(dst_address, rs_src_address)) if ether['IPv6'].hlim != 255: - raise RuntimeError('Hop limit not correct: {0}!=255'.format( - ether['IPv6'].hlim)) + raise RuntimeError('Hop limit not correct: {0}!=255'. + format(ether['IPv6'].hlim)) - ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code + ra_code = ether[ICMPv6ND_RA].code if ra_code != 0: raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_tcp_for_classifier_test.py b/resources/traffic_scripts/send_tcp_for_classifier_test.py index 5d8d387767..5a6873ab4e 100755 --- a/resources/traffic_scripts/send_tcp_for_classifier_test.py +++ b/resources/traffic_scripts/send_tcp_for_classifier_test.py @@ -17,27 +17,24 @@ Traffic script that sends an TCP packet from TG to DUT. """ import sys -import time -from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 from scapy.all import Ether, Packet, Raw +from scapy.layers.inet import IP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from resources.libraries.python.SFC.VerifyPacket import * -from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon +from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from robot.api import logger def main(): """Send TCP packet from one traffic generator interface to DUT. :raises: If the IP address is invalid. """ - args = TrafficScriptArg( - ['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'timeout', 'framesize', 'testtype']) + args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', + 'timeout', 'framesize', 'testtype']) src_mac = args.get_arg('src_mac') dst_mac = args.get_arg('dst_mac') @@ -54,10 +51,9 @@ def main(): sent_packets = [] protocol = TCP - source_port = sfccon.DEF_SRC_PORT - destination_port = sfccon.DEF_DST_PORT + source_port = SfcCon.DEF_SRC_PORT + destination_port = SfcCon.DEF_DST_PORT - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -66,8 +62,8 @@ def main(): raise ValueError("Invalid IP version!") pkt_header = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - protocol(sport=int(source_port), dport=int(destination_port))) + ip_version(src=src_ip, dst=dst_ip) / + protocol(sport=int(source_port), dport=int(destination_port))) fsize_no_fcs = frame_size - 4 pad_len = max(0, fsize_no_fcs - len(pkt_header)) @@ -79,10 +75,17 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(timeout) - - if ether is None: - raise RuntimeError("No packet is received!") + while True: + ether = rxq.recv(timeout) + if ether is None: + raise RuntimeError('No packet is received!') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break # let us begin to check the NSH SFC loopback packet VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type) diff --git a/resources/traffic_scripts/send_tcp_udp.py b/resources/traffic_scripts/send_tcp_udp.py index 77f918213f..4cba73286a 100755 --- a/resources/traffic_scripts/send_tcp_udp.py +++ b/resources/traffic_scripts/send_tcp_udp.py @@ -19,9 +19,9 @@ from one interface to the other. import sys import ipaddress -from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 from scapy.all import Ether +from scapy.layers.inet import IP, UDP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -62,9 +62,8 @@ def valid_ipv6(ip): def main(): """Send TCP or UDP packet from one traffic generator interface to the other. """ - args = TrafficScriptArg( - ['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol', - 'source_port', 'destination_port']) + args = TrafficScriptArg(['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol', + 'source_port', 'destination_port']) src_mac = args.get_arg('tx_mac') dst_mac = args.get_arg('rx_mac') @@ -77,7 +76,6 @@ def main(): source_port = args.get_arg('source_port') destination_port = args.get_arg('destination_port') - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -90,7 +88,7 @@ def main(): elif protocol.upper() == 'UDP': protocol = UDP else: - raise ValueError("Invalid type of protocol!") + raise ValueError("Invalid protocol type!") rxq = RxQueue(rx_if) txq = TxQueue(tx_if) @@ -100,19 +98,27 @@ def main(): protocol(sport=int(source_port), dport=int(destination_port))) txq.send(pkt_raw) - ether = rxq.recv(2) - if ether is None: - raise RuntimeError("TCP/UDP Rx timeout") + while True: + ether = rxq.recv(2) + if ether is None: + raise RuntimeError('TCP/UDP Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break - if 'TCP' in ether: + if TCP in ether: print ("TCP packet received.") - elif 'UDP' in ether: + elif UDP in ether: print ("UDP packet received.") else: - raise RuntimeError("Not an TCP or UDP packet received {0}" - .format(ether.__repr__())) + raise RuntimeError("Not an TCP or UDP packet received {0}". + format(ether.__repr__())) sys.exit(0) diff --git a/resources/traffic_scripts/send_vxlan_for_proxy_test.py b/resources/traffic_scripts/send_vxlan_for_proxy_test.py index c356e1977e..d33ed413c8 100755 --- a/resources/traffic_scripts/send_vxlan_for_proxy_test.py +++ b/resources/traffic_scripts/send_vxlan_for_proxy_test.py @@ -19,24 +19,22 @@ import sys import time from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from scapy.all import Ether, Packet, Raw from resources.libraries.python.SFC.VerifyPacket import * -from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon +from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from robot.api import logger def main(): """Send VxLAN packet from TG to DUT. :raises: If the IP address is invalid. """ - args = TrafficScriptArg( - ['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'timeout', 'framesize', 'testtype']) + args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', + 'timeout', 'framesize', 'testtype']) src_mac = args.get_arg('src_mac') dst_mac = args.get_arg('dst_mac') @@ -44,7 +42,7 @@ def main(): dst_ip = args.get_arg('dst_ip') tx_if = args.get_arg('tx_if') rx_if = args.get_arg('rx_if') - timeout = int(args.get_arg('timeout')) + timeout = max(2, int(args.get_arg('timeout'))) frame_size = int(args.get_arg('framesize')) test_type = args.get_arg('testtype') @@ -53,10 +51,9 @@ def main(): sent_packets = [] protocol = TCP - source_port = sfccon.DEF_SRC_PORT - destination_port = sfccon.DEF_DST_PORT + source_port = SfcCon.DEF_SRC_PORT + destination_port = SfcCon.DEF_DST_PORT - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -65,17 +62,17 @@ def main(): raise ValueError("Invalid IP version!") innerpkt = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - protocol(sport=int(source_port), dport=int(destination_port))) + ip_version(src=src_ip, dst=dst_ip) / + protocol(sport=int(source_port), dport=int(destination_port))) vxlan = '\x08\x00\x00\x00\x00\x00\x01\x00' raw_data = vxlan + str(innerpkt) pkt_header = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - UDP(sport=int(source_port), dport=4789) / - Raw(load=raw_data)) + ip_version(src=src_ip, dst=dst_ip) / + UDP(sport=int(source_port), dport=4789) / + Raw(load=raw_data)) fsize_no_fcs = frame_size - 4 pad_len = max(0, fsize_no_fcs - len(pkt_header)) @@ -87,10 +84,17 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError("No packet is received!") + while True: + ether = rxq.recv(timeout) + if ether is None: + raise RuntimeError('No packet is received!') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break # let us begin to check the proxy outbound packet VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type) diff --git a/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py b/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py index d998d7c6a0..3ea1f0bc62 100755 --- a/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py +++ b/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py @@ -16,27 +16,24 @@ from TG to DUT. """ import sys -import time from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from scapy.all import Ether, Packet, Raw from resources.libraries.python.SFC.VerifyPacket import * -from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon +from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from robot.api import logger def main(): """Send VxLAN-GPE+NSH packet from TG to DUT. :raises: If the IP address is invalid. """ - args = TrafficScriptArg( - ['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'timeout', 'framesize', 'testtype']) + args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', + 'timeout', 'framesize', 'testtype']) src_mac = args.get_arg('src_mac') dst_mac = args.get_arg('dst_mac') @@ -44,7 +41,7 @@ def main(): dst_ip = args.get_arg('dst_ip') tx_if = args.get_arg('tx_if') rx_if = args.get_arg('rx_if') - timeout = int(args.get_arg('timeout')) + timeout = max(2, int(args.get_arg('timeout'))) frame_size = int(args.get_arg('framesize')) test_type = args.get_arg('testtype') @@ -53,10 +50,9 @@ def main(): sent_packets = [] protocol = TCP - source_port = sfccon.DEF_SRC_PORT - destination_port = sfccon.DEF_DST_PORT + source_port = SfcCon.DEF_SRC_PORT + destination_port = SfcCon.DEF_DST_PORT - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -65,8 +61,8 @@ def main(): raise ValueError("Invalid IP version!") innerpkt = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - protocol(sport=int(source_port), dport=int(destination_port))) + ip_version(src=src_ip, dst=dst_ip) / + protocol(sport=int(source_port), dport=int(destination_port))) vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x09\x00\x00\x06' \ '\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \ @@ -75,9 +71,9 @@ def main(): raw_data = vxlangpe_nsh + str(innerpkt) pkt_header = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - UDP(sport=int(source_port), dport=4790) / - Raw(load=raw_data)) + ip_version(src=src_ip, dst=dst_ip) / + UDP(sport=int(source_port), dport=4790) / + Raw(load=raw_data)) fsize_no_fcs = frame_size - 4 pad_len = max(0, fsize_no_fcs - len(pkt_header)) @@ -89,10 +85,17 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError("No packet is received!") + while True: + ether = rxq.recv(timeout) + if ether is None: + raise RuntimeError('No packet is received!') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break # let us begin to check the proxy inbound packet VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type) diff --git a/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py b/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py index 55c06f0691..6879d20f2c 100755 --- a/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py +++ b/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py @@ -16,27 +16,24 @@ from TG to DUT. """ import sys -import time from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from scapy.all import Ether, Packet, Raw from resources.libraries.python.SFC.VerifyPacket import * -from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon +from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from robot.api import logger def main(): """Send VxLAN-GPE+NSH packet from TG to DUT. :raises: If the IP address is invalid. """ - args = TrafficScriptArg( - ['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'timeout', 'framesize', 'testtype']) + args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', + 'timeout', 'framesize', 'testtype']) src_mac = args.get_arg('src_mac') dst_mac = args.get_arg('dst_mac') @@ -44,7 +41,7 @@ def main(): dst_ip = args.get_arg('dst_ip') tx_if = args.get_arg('tx_if') rx_if = args.get_arg('rx_if') - timeout = int(args.get_arg('timeout')) + timeout = max(2, int(args.get_arg('timeout'))) frame_size = int(args.get_arg('framesize')) test_type = args.get_arg('testtype') @@ -53,10 +50,9 @@ def main(): sent_packets = [] protocol = TCP - source_port = sfccon.DEF_SRC_PORT - destination_port = sfccon.DEF_DST_PORT + source_port = SfcCon.DEF_SRC_PORT + destination_port = SfcCon.DEF_DST_PORT - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -65,8 +61,8 @@ def main(): raise ValueError("Invalid IP version!") innerpkt = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - protocol(sport=int(source_port), dport=int(destination_port))) + ip_version(src=src_ip, dst=dst_ip) / + protocol(sport=int(source_port), dport=int(destination_port))) vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x0a\x00\x00\x06' \ '\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \ @@ -75,9 +71,9 @@ def main(): raw_data = vxlangpe_nsh + str(innerpkt) pkt_header = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - UDP(sport=int(source_port), dport=4790) / - Raw(load=raw_data)) + ip_version(src=src_ip, dst=dst_ip) / + UDP(sport=int(source_port), dport=4790) / + Raw(load=raw_data)) fsize_no_fcs = frame_size - 4 pad_len = max(0, fsize_no_fcs - len(pkt_header)) @@ -89,10 +85,17 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError("No packet is received!") + while True: + ether = rxq.recv(timeout) + if ether is None: + raise RuntimeError('No packet is received!') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break # let us begin to check the sfc sff packet VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type) diff --git a/resources/traffic_scripts/span_check.py b/resources/traffic_scripts/span_check.py index c7ca8a7374..cbf65d3aaf 100755 --- a/resources/traffic_scripts/span_check.py +++ b/resources/traffic_scripts/span_check.py @@ -21,7 +21,7 @@ import sys import ipaddress from scapy.layers.inet import IP, ICMP, ARP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from scapy.layers.l2 import Ether from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad @@ -64,8 +64,8 @@ def main(): """Send a simple L2 or ICMP packet from one TG interface to DUT, then receive a copy of the packet on the second TG interface, and a copy of the ICMP reply.""" - args = TrafficScriptArg( - ['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', 'ptype']) + args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', + 'ptype']) src_mac = args.get_arg('tg_src_mac') dst_mac = args.get_arg('dut_if1_mac') @@ -104,11 +104,20 @@ def main(): txq.send(pkt_raw) sent.append(auto_pad(pkt_raw)) - ether = rxq_mirrored.recv(2) # Receive copy of Rx packet. - if ether is None: - raise RuntimeError("Rx timeout of mirrored Rx packet") + while True: + ether = rxq_mirrored.recv(2) + if ether is None: + raise RuntimeError("Rx timeout of mirrored Rx packet") + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + pkt = auto_pad(pkt_raw) if str(ether) != str(pkt): print("Mirrored Rx packet doesn't match the original Rx packet.") @@ -148,15 +157,17 @@ def main(): # Receive reply on TG Tx port. ether_repl = rxq_tx.recv(2, sent) + if ether_repl is None: raise RuntimeError("Reply not received on TG Tx port.") - else: - print("Reply received on TG Tx port.\n") + + print("Reply received on TG Tx port.\n") # Receive copy of Tx packet. ether = rxq_mirrored.recv(2) if ether is None: raise RuntimeError("Rx timeout of mirrored Tx packet") + if str(ether) != str(ether_repl): print("Mirrored Tx packet doesn't match the received Tx packet.") if ether.src != ether_repl.src or ether.dst != ether_repl.dst: |