diff options
Diffstat (limited to 'resources/traffic_scripts')
22 files changed, 631 insertions, 424 deletions
diff --git a/resources/traffic_scripts/check_ra_packet.py b/resources/traffic_scripts/check_ra_packet.py index 231e07da11..9717c7db95 100755 --- a/resources/traffic_scripts/check_ra_packet.py +++ b/resources/traffic_scripts/check_ra_packet.py @@ -17,6 +17,8 @@ import sys import ipaddress +from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_NS + from resources.libraries.python.PacketVerifier import RxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -56,15 +58,23 @@ def main(): interval = int(args.get_arg('interval')) rxq = RxQueue(rx_if) - ether = rxq.recv(max(5, interval)) + # receive ICMPv6ND_RA packet + while True: + ether = rxq.recv(max(5, interval)) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') - # Check whether received packet contains layer RA and check other values - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break - if not ether.haslayer('ICMPv6ND_RA'): - raise RuntimeError('Not an RA packet received {0}' - .format(ether.__repr__())) + # Check if received packet contains layer RA and check other values + if not ether.haslayer(ICMPv6ND_RA): + raise RuntimeError('Not an RA packet received {0}'. + format(ether.__repr__())) src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src)) dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst)) @@ -72,22 +82,22 @@ def main(): all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1') if src_address != link_local: - raise RuntimeError( - 'Source address ({0}) not matching link local address({1})'.format( - src_address, link_local)) + raise RuntimeError('Source address ({0}) not matching link local ' + 'address ({1})'.format(src_address, link_local)) if dst_address != all_nodes_multicast: - raise RuntimeError('Packet destination address ({0}) is not the all ' - 'nodes multicast address ({1}).'.format( - dst_address, all_nodes_multicast)) - if ether['IPv6'].hlim != 255: - raise RuntimeError('Hop limit not correct: {0}!=255'.format( - ether['IPv6'].hlim)) - - ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code + raise RuntimeError('Packet destination address ({0}) is not the all' + ' nodes multicast address ({1}).'. + format(dst_address, all_nodes_multicast)) + if ether[IPv6].hlim != 255: + raise RuntimeError('Hop limit not correct: {0}!=255'. + format(ether[IPv6].hlim)) + + ra_code = ether[ICMPv6ND_RA].code if ra_code != 0: raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/icmpv6_echo.py b/resources/traffic_scripts/icmpv6_echo.py index 4bf573a1f1..a18896b07f 100755 --- a/resources/traffic_scripts/icmpv6_echo.py +++ b/resources/traffic_scripts/icmpv6_echo.py @@ -17,13 +17,19 @@ import sys import logging + +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + from scapy.all import Ether +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def main(): @@ -43,52 +49,60 @@ def main(): # send ICMPv6 neighbor advertisement message pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=src_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=src_mac)) + IPv6(src=src_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NA(tgt=src_ip, R=0) / + ICMPv6NDOptDstLLAddr(lladdr=src_mac)) sent_packets.append(pkt_send) txq.send(pkt_send) # send ICMPv6 echo request pkt_send = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) + IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) sent_packets.append(pkt_send) txq.send(pkt_send) # receive ICMPv6 echo reply - ether = rxq.recv(2, sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') + while True: + ether = rxq.recv(2, sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 received {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no ICMPv6 echo reply ' + 'received {0}'.format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Reply'] + icmpv6 = ipv6[ICMPv6EchoReply] # check identifier and sequence number if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \ - 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) + raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} seq {1} ' + 'should be ID {2} seq {3}'. + format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) # verify checksum cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoReply(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/icmpv6_echo_req_resp.py b/resources/traffic_scripts/icmpv6_echo_req_resp.py index c2cc4d20a0..ec9cf94a67 100755 --- a/resources/traffic_scripts/icmpv6_echo_req_resp.py +++ b/resources/traffic_scripts/icmpv6_echo_req_resp.py @@ -18,14 +18,20 @@ import sys import logging + +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr + +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply from scapy.all import Ether +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg + def main(): args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac', @@ -52,109 +58,122 @@ def main(): # send ICMPv6 neighbor advertisement message pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=src_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=src_mac)) + IPv6(src=src_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NA(tgt=src_ip, R=0) / + ICMPv6NDOptDstLLAddr(lladdr=src_mac)) src_sent_packets.append(pkt_send) src_txq.send(pkt_send) pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=dst_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=dst_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=dst_mac)) + IPv6(src=dst_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NA(tgt=dst_ip, R=0) / + ICMPv6NDOptDstLLAddr(lladdr=dst_mac)) dst_sent_packets.append(pkt_send) dst_txq.send(pkt_send) # send ICMPv6 echo request from first TG interface pkt_send = (Ether(src=src_mac, dst=src_nh_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) / - ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) + IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) / + ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) src_sent_packets.append(pkt_send) src_txq.send(pkt_send) # receive ICMPv6 echo request on second TG interface - ether = dst_rxq.recv(2, dst_sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') + while True: + ether = dst_rxq.recv(2, dst_sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 received: {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] # verify hop limit processing if ipv6.hlim != (hop_limit - hop_num): - raise RuntimeError( - 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim, - hop_limit - hop_num)) + raise RuntimeError('Invalid hop limit {0} should be {1}'. + format(ipv6.hlim,hop_limit - hop_num)) if not ipv6.haslayer(ICMPv6EchoRequest): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'. + format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Request'] + icmpv6 = ipv6[ICMPv6EchoRequest] # check identifier and sequence number if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \ - 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) + raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} ' + 'seq {1} should be ID {2} seq {3}'. + format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) # verify checksum cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoRequest(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) # send ICMPv6 echo reply from second TG interface pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) / - IPv6(src=dst_ip, dst=src_ip) / - ICMPv6EchoReply(id=echo_id, seq=echo_seq)) + IPv6(src=dst_ip, dst=src_ip) / + ICMPv6EchoReply(id=echo_id, seq=echo_seq)) dst_sent_packets.append(pkt_send) dst_txq.send(pkt_send) # receive ICMPv6 echo reply on first TG interface - ether = src_rxq.recv(2, src_sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') + while True: + ether = src_rxq.recv(2, src_sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] # verify hop limit processing if ipv6.hlim != (hop_limit - hop_num): - raise RuntimeError( - 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim, - hop_limit - hop_num)) + raise RuntimeError('Invalid hop limit {0} should be {1}'. + format(ipv6.hlim, hop_limit - hop_num)) if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'. + format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Reply'] + icmpv6 = ipv6[ICMPv6EchoReply] # check identifier and sequence number if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \ - 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) + raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} ' + 'seq {1} should be ID {2} seq {3}'. + format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) # verify checksum cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoReply(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipfix_check.py b/resources/traffic_scripts/ipfix_check.py index 2a08f0ce85..f5693cc7e8 100755 --- a/resources/traffic_scripts/ipfix_check.py +++ b/resources/traffic_scripts/ipfix_check.py @@ -24,8 +24,8 @@ from scapy.layers.l2 import Ether from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler, \ - IPFIXData +from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler +from resources.libraries.python.telemetry.IPFIXUtil import IPFIXData def valid_ipv4(ip): @@ -123,6 +123,11 @@ def main(): pkt = rxq.recv(10, ignore=ignore, verbose=verbose) if pkt is None: raise RuntimeError("RX timeout") + + if pkt.haslayer("ICMPv6ND_NS"): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + if pkt.haslayer("IPFIXHeader"): if pkt.haslayer("IPFIXTemplate"): # create or update template for IPFIX data packets @@ -138,9 +143,9 @@ def main(): # verify packet count if data["packetTotalCount"] != count: - raise RuntimeError( - "IPFIX reported wrong packet count. Count was {0}," - " but should be {1}".format(data["packetTotalCount"], count)) + raise RuntimeError("IPFIX reported wrong packet count. Count was {0}," + "but should be {1}".format(data["packetTotalCount"], + count)) # verify IP addresses keys = data.keys() err = "{0} mismatch. Packets used {1}, but were classified as {2}." diff --git a/resources/traffic_scripts/ipfix_sessions.py b/resources/traffic_scripts/ipfix_sessions.py index e7597a894a..11e77fa08c 100755 --- a/resources/traffic_scripts/ipfix_sessions.py +++ b/resources/traffic_scripts/ipfix_sessions.py @@ -192,6 +192,11 @@ def main(): pkt = rxq.recv(5) if pkt is None: raise RuntimeError("RX timeout") + + if pkt.haslayer("ICMPv6ND_NS"): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + if pkt.haslayer("IPFIXHeader"): if pkt.haslayer("IPFIXTemplate"): # create or update template for IPFIX data packets @@ -219,6 +224,6 @@ def main(): raise RuntimeError("Received non-IPFIX packet or IPFIX header was" "not recognized.") -if __name__ == "__main__": +if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipsec.py b/resources/traffic_scripts/ipsec.py index 13d44b8a51..1561738c60 100755 --- a/resources/traffic_scripts/ipsec.py +++ b/resources/traffic_scripts/ipsec.py @@ -18,8 +18,14 @@ import sys import logging +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from scapy.all import Ether, IP, ICMP, IPv6, ICMPv6EchoRequest, ICMPv6EchoReply + +from scapy.all import Ether +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply from scapy.layers.ipsec import SecurityAssociation, ESP from ipaddress import ip_address @@ -39,7 +45,7 @@ def check_ipv4(pkt_recv, dst_tun, src_ip, dst_ip, sa_in): :type dst_tun: str :type src_ip: str :type dst_ip: str - :type sa_sa: scapy.layers.ipsec.SecurityAssociation + :type sa_in: scapy.layers.ipsec.SecurityAssociation :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(IP): @@ -55,15 +61,15 @@ def check_ipv4(pkt_recv, dst_tun, src_ip, dst_ip, sa_in): raise RuntimeError( 'Not an ESP packet received: {0}'.format(pkt_recv.__repr__())) - ip_pkt = pkt_recv['IP'] + ip_pkt = pkt_recv[IP] d_pkt = sa_in.decrypt(ip_pkt) - if d_pkt['IP'].dst != dst_ip: + if d_pkt[IP].dst != dst_ip: raise RuntimeError( 'Decrypted packet has invalid destination address: {0} ' 'should be: {1}'.format(d_pkt['IP'].dst, dst_ip)) - if d_pkt['IP'].src != src_ip: + if d_pkt[IP].src != src_ip: raise RuntimeError( 'Decrypted packet has invalid source address: {0} should be: {1}' .format(d_pkt['IP'].src, src_ip)) @@ -93,7 +99,7 @@ def check_ipv6(pkt_recv, dst_tun, src_ip, dst_ip, sa_in): raise RuntimeError( 'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__())) - if pkt_recv['IPv6'].dst != dst_tun: + if pkt_recv[IPv6].dst != dst_tun: raise RuntimeError( 'Received packet has invalid destination address: {0} ' 'should be: {1}'.format(pkt_recv['IPv6'].dst, dst_tun)) @@ -102,15 +108,15 @@ def check_ipv6(pkt_recv, dst_tun, src_ip, dst_ip, sa_in): raise RuntimeError( 'Not an ESP packet received: {0}'.format(pkt_recv.__repr__())) - ip_pkt = pkt_recv['IPv6'] + ip_pkt = pkt_recv[IPv6] d_pkt = sa_in.decrypt(ip_pkt) - if d_pkt['IPv6'].dst != dst_ip: + if d_pkt[IPv6].dst != dst_ip: raise RuntimeError( 'Decrypted packet has invalid destination address {0}: ' 'should be: {1}'.format(d_pkt['IPv6'].dst, dst_ip)) - if d_pkt['IPv6'].src != src_ip: + if d_pkt[IPv6].src != src_ip: raise RuntimeError( 'Decrypted packet has invalid source address: {0} should be: {1}' .format(d_pkt['IPv6'].src, src_ip)) @@ -175,25 +181,33 @@ def main(): sent_packets = [] if is_ipv4: - ip_pkt = IP(src=src_ip, dst=dst_ip) / \ - ICMP() + ip_pkt = (IP(src=src_ip, dst=dst_ip) / + ICMP()) ip_pkt = IP(str(ip_pkt)) else: - ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \ - ICMPv6EchoRequest() + ip_pkt = (IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest()) ip_pkt = IPv6(str(ip_pkt)) e_pkt = sa_out.encrypt(ip_pkt) - pkt_send = Ether(src=src_mac, dst=dst_mac) / \ - e_pkt + pkt_send = (Ether(src=src_mac, dst=dst_mac) / + e_pkt) sent_packets.append(pkt_send) txq.send(pkt_send) - pkt_recv = rxq.recv(2, sent_packets) + while True: + pkt_recv = rxq.recv(2, sent_packets) - if pkt_recv is None: - raise RuntimeError('ESP packet Rx timeout') + if pkt_recv is None: + raise RuntimeError('ESP packet Rx timeout') + + if pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if is_ipv4: check_ipv4(pkt_recv, src_tun, dst_ip, src_ip, sa_in) @@ -202,5 +216,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py index b1028028b0..c9213999ec 100755 --- a/resources/traffic_scripts/ipv6_nd_proxy_check.py +++ b/resources/traffic_scripts/ipv6_nd_proxy_check.py @@ -15,8 +15,9 @@ """Traffic script that sends DHCPv6 proxy packets.""" from scapy.layers.inet import Ether -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\ - ICMPv6ND_NS +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -44,17 +45,23 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): sent_packets = [] - icmpv6nd_solicit_pkt =\ - Ether(src=src_mac, dst=dst_mac) / \ - IPv6(src=src_ip) / \ - ICMPv6ND_NS(tgt=dst_ip) + icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) / + IPv6(src=src_ip) / + ICMPv6ND_NS(tgt=dst_ip)) sent_packets.append(icmpv6nd_solicit_pkt) txq.send(icmpv6nd_solicit_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3, ignore=sent_packets) + while True: + pkt = rxq.recv(3, ignore=sent_packets) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -63,34 +70,33 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): raise RuntimeError('ICMPv6ND Proxy response timeout.') if ether.src != dst_mac: - raise RuntimeError("Source MAC address error: {} != {}".format( - ether.src, dst_mac)) + raise RuntimeError("Source MAC address error: {} != {}". + format(ether.src, dst_mac)) print "Source MAC address: OK." if ether.dst != src_mac: - raise RuntimeError("Destination MAC address error: {} != {}".format( - ether.dst, src_mac)) + raise RuntimeError("Destination MAC address error: {} != {}". + format(ether.dst, src_mac)) print "Destination MAC address: OK." - if ether['IPv6'].src != dst_ip: - raise RuntimeError("Source IP address error: {} != {}".format( - ether['IPv6'].src, dst_ip)) + if ether[IPv6].src != dst_ip: + raise RuntimeError("Source IP address error: {} != {}". + format(ether[IPv6].src, dst_ip)) print "Source IP address: OK." - if ether['IPv6'].dst != src_ip: - raise RuntimeError("Destination IP address error: {} != {}".format( - ether['IPv6'].dst, src_ip)) + if ether[IPv6].dst != src_ip: + raise RuntimeError("Destination IP address error: {} != {}". + format(ether[IPv6].dst, src_ip)) print "Destination IP address: OK." try: - target_addr = ether['IPv6']\ - ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt + target_addr = ether[IPv6][ICMPv6ND_NA].tgt except (KeyError, AttributeError): raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.") if target_addr != dst_ip: - raise RuntimeError("ICMPv6 field 'Target address' error:" - " {} != {}".format(target_addr, dst_ip)) + raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}". + format(target_addr, dst_ip)) print "Target address field: OK." @@ -119,16 +125,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, rxq = RxQueue(dst_if) txq = TxQueue(src_if) - icmpv6_ping_pkt = \ - Ether(src=src_mac, dst=proxy_to_src_mac) / \ - IPv6(src=src_ip, dst=dst_ip) / \ - ICMPv6EchoRequest() + icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) / + IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest()) txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3) + while True: + pkt = rxq.recv(3) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -136,7 +148,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, if ether is None: raise RuntimeError('ICMPv6 Echo Request timeout.') try: - ether["IPv6"]["ICMPv6 Echo Request"] + ether[IPv6]["ICMPv6 Echo Request"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Request.") print "ICMP Echo: OK." @@ -144,16 +156,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, rxq = RxQueue(src_if) txq = TxQueue(dst_if) - icmpv6_ping_pkt = \ - Ether(src=dst_mac, dst=proxy_to_dst_mac) / \ - IPv6(src=dst_ip, dst=src_ip) / \ - ICMPv6EchoReply() + icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) / + IPv6(src=dst_ip, dst=src_ip) / + ICMPv6EchoReply()) txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3) + while True: + pkt = rxq.recv(3) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -161,7 +179,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, if ether is None: raise RuntimeError('DHCPv6 SOLICIT timeout') try: - ether["IPv6"]["ICMPv6 Echo Reply"] + ether[IPv6]["ICMPv6 Echo Reply"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.") @@ -187,8 +205,9 @@ def main(): imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip) # Verify route (ICMP echo/reply) - ipv6_ping(src_if, dst_if, src_mac, dst_mac, - proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip) + ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac, + proxy_to_dst_mac, src_ip, dst_ip) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipv6_ns.py b/resources/traffic_scripts/ipv6_ns.py index 70c6ab445a..5852e6317f 100755 --- a/resources/traffic_scripts/ipv6_ns.py +++ b/resources/traffic_scripts/ipv6_ns.py @@ -17,13 +17,18 @@ import sys import logging + +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg + +from scapy.all import Ether from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr -from scapy.all import Ether + +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def main(): @@ -41,57 +46,67 @@ def main(): # send ICMPv6 neighbor solicitation message pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NS(tgt=dst_ip) / - ICMPv6NDOptSrcLLAddr(lladdr=src_mac)) + IPv6(src=src_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NS(tgt=dst_ip) / + ICMPv6NDOptSrcLLAddr(lladdr=src_mac)) sent_packets.append(pkt_send) txq.send(pkt_send) # receive ICMPv6 neighbor advertisement message - ether = rxq.recv(2, sent_packets) + while True: + ether = rxq.recv(2, sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + if ether is None: raise RuntimeError('ICMPv6 echo reply Rx timeout') if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 received {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] if not ipv6.haslayer(ICMPv6ND_NA): - raise RuntimeError( - 'Unexpected packet with no ICMPv6 ND-NA received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received ' + '{0}'.format(ipv6.__repr__())) - icmpv6_na = ipv6['ICMPv6 Neighbor Discovery - Neighbor Advertisement'] + icmpv6_na = ipv6[ICMPv6ND_NA] # verify target address if icmpv6_na.tgt != dst_ip: - raise RuntimeError('Invalid target address {0} should be {1}'.format( - icmpv6_na.tgt, dst_ip)) + raise RuntimeError('Invalid target address {0} should be {1}'. + format(icmpv6_na.tgt, dst_ip)) if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr): - raise RuntimeError( - 'Missing Destination Link-Layer Address option in ICMPv6 ' + - 'Neighbor Advertisement {0}'.format(icmpv6_na.__repr__())) + raise RuntimeError('Missing Destination Link-Layer Address option in ' + 'ICMPv6 Neighbor Advertisement {0}'. + format(icmpv6_na.__repr__())) - option = 'ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address' - dst_ll_addr = icmpv6_na[option] + dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr] # verify destination link-layer address field if dst_ll_addr.lladdr != dst_mac: - raise RuntimeError('Invalid lladdr {0} should be {1}'.format( - dst_ll_addr.lladdr, dst_mac)) + raise RuntimeError('Invalid lladdr {0} should be {1}'. + format(dst_ll_addr.lladdr, dst_mac)) # verify checksum cksum = icmpv6_na.cksum del icmpv6_na.cksum tmp = ICMPv6ND_NA(str(icmpv6_na)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/ipv6_sweep_ping.py b/resources/traffic_scripts/ipv6_sweep_ping.py index 80fdda532a..28d23a16ef 100755 --- a/resources/traffic_scripts/ipv6_sweep_ping.py +++ b/resources/traffic_scripts/ipv6_sweep_ping.py @@ -19,13 +19,18 @@ import logging import os import sys +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, \ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + from scapy.all import Ether +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def main(): @@ -68,37 +73,43 @@ def main(): sent_packets.append(pkt_send) txq.send(pkt_send) - ether = rxq.recv(ignore=sent_packets) - if ether is None: - raise RuntimeError( - 'ICMPv6 echo reply seq {0} Rx timeout'.format(echo_seq)) + while True: + ether = rxq.recv(ignore=sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply seq {0} ' + 'Rx timeout'.format(echo_seq)) + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError( - 'Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 layer ' + 'received: {0}'.format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no ICMPv6 echo reply ' + 'layer received: {0}'.format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Reply'] + icmpv6 = ipv6[ICMPv6EchoReply] if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' - 'ID {2} seq {3}, {0}'.format(icmpv6.id, icmpv6.seq, echo_id, - echo_seq)) + raise RuntimeError('ICMPv6 echo reply with invalid data received: ' + 'ID {0} seq {1} should be ID {2} seq {3}, {0}'. + format(icmpv6.id, icmpv6.seq, echo_id, + echo_seq)) cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoReply(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum: {0} should be {1}'. + format(cksum, tmp.cksum)) sent_packets.remove(pkt_send) diff --git a/resources/traffic_scripts/policer.py b/resources/traffic_scripts/policer.py index ee9fa29c4a..62c397d496 100755 --- a/resources/traffic_scripts/policer.py +++ b/resources/traffic_scripts/policer.py @@ -21,7 +21,10 @@ import logging # pylint: disable=no-name-in-module # pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from scapy.all import Ether, IP, IPv6, TCP + +from scapy.all import Ether +from scapy.layers.inet import IP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from ipaddress import ip_address from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -38,17 +41,17 @@ def check_ipv4(pkt_recv, dscp): :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(IP): - raise RuntimeError( - 'Not an IPv4 packet received: {0}'.format(pkt_recv.__repr__())) + raise RuntimeError('Not an IPv4 packet received: {0}'. + format(pkt_recv.__repr__())) - rx_dscp = pkt_recv['IP'].tos >> 2 + rx_dscp = pkt_recv[IP].tos >> 2 if rx_dscp != dscp: - raise RuntimeError( - 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp)) + raise RuntimeError('Invalid DSCP {0} should be {1}'. + format(rx_dscp, dscp)) if not pkt_recv.haslayer(TCP): - raise RuntimeError( - 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__())) + raise RuntimeError('Not a TCP packet received: {0}'. + format(pkt_recv.__repr__())) def check_ipv6(pkt_recv, dscp): @@ -61,17 +64,17 @@ def check_ipv6(pkt_recv, dscp): :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(IPv6): - raise RuntimeError( - 'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__())) + raise RuntimeError('Not an IPv6 packet received: {0}'. + format(pkt_recv.__repr__())) - rx_dscp = pkt_recv['IPv6'].tc >> 2 + rx_dscp = pkt_recv[IPv6].tc >> 2 if rx_dscp != dscp: - raise RuntimeError( - 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp)) + raise RuntimeError('Invalid DSCP {0} should be {1}'. + format(rx_dscp, dscp)) if not pkt_recv.haslayer(TCP): - raise RuntimeError( - 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__())) + raise RuntimeError('Not a TCP packet received: {0}'. + format(pkt_recv.__repr__())) # pylint: disable=too-many-locals @@ -97,19 +100,29 @@ def main(): sent_packets = [] if is_ipv4: - ip_pkt = IP(src=src_ip, dst=dst_ip) / \ - TCP() + ip_pkt = (IP(src=src_ip, dst=dst_ip) / + TCP()) else: - ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \ - TCP() + ip_pkt = (IPv6(src=src_ip, dst=dst_ip) / + TCP()) - pkt_send = Ether(src=src_mac, dst=dst_mac) / \ - ip_pkt + pkt_send = (Ether(src=src_mac, dst=dst_mac) / + ip_pkt) sent_packets.append(pkt_send) txq.send(pkt_send) - pkt_recv = rxq.recv(2, sent_packets) + while True: + pkt_recv = rxq.recv(2, sent_packets) + if pkt_recv is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if pkt_recv.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if pkt_recv is None: raise RuntimeError('Rx timeout') @@ -121,5 +134,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_icmp_check_headers.py b/resources/traffic_scripts/send_icmp_check_headers.py index c9e3a35df4..2193164b61 100755 --- a/resources/traffic_scripts/send_icmp_check_headers.py +++ b/resources/traffic_scripts/send_icmp_check_headers.py @@ -22,8 +22,7 @@ import sys import ipaddress from robot.api import logger from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import ICMPv6EchoRequest -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from scapy.layers.l2 import Ether, Dot1Q from resources.libraries.python.PacketVerifier import RxQueue, TxQueue @@ -94,28 +93,36 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - if tx_if == rx_if: - ether = rxq.recv(2, ignore=sent_packets) - else: - ether = rxq.recv(2) - if ether is None: - raise RuntimeError("ICMP echo Rx timeout") + while True: + if tx_if == rx_if: + ether = rxq.recv(2, ignore=sent_packets) + else: + ether = rxq.recv(2) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: logger.trace("MAC matched") else: - raise RuntimeError( - "Matching packet unsuccessful: {0}".format(ether.__repr__())) + raise RuntimeError("Matching packet unsuccessful: {0}". + format(ether.__repr__())) if encaps_rx == 'Dot1q': if ether[Dot1Q].vlan == int(vlan_rx): logger.trace("VLAN matched") else: raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-' - 'received, {}-expected):\n{}' - .format(ether[Dot1Q].vlan, vlan_rx, - ether.__repr__())) + 'received, {}-expected):\n{}'. + format(ether[Dot1Q].vlan, vlan_rx, + ether.__repr__())) ip = ether[Dot1Q].payload elif encaps_rx == 'Dot1ad': raise NotImplementedError() @@ -123,21 +130,21 @@ def main(): ip = ether.payload if not isinstance(ip, ip_format): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError("Not an IP packet received {0}". + format(ip.__repr__())) # Compare data from packets if src_ip == ip.src: logger.trace("Src IP matched") else: - raise RuntimeError("Matching Src IP unsuccessful: {} != {}" - .format(src_ip, ip.src)) + raise RuntimeError("Matching Src IP unsuccessful: {} != {}". + format(src_ip, ip.src)) if dst_ip == ip.dst: logger.trace("Dst IP matched") else: - raise RuntimeError("Matching Dst IP unsuccessful: {} != {}" - .format(dst_ip, ip.dst)) + raise RuntimeError("Matching Dst IP unsuccessful: {} != {}". + format(dst_ip, ip.dst)) sys.exit(0) diff --git a/resources/traffic_scripts/send_icmp_check_multipath.py b/resources/traffic_scripts/send_icmp_check_multipath.py index 4e39efdfcd..5da3b7b096 100755 --- a/resources/traffic_scripts/send_icmp_check_multipath.py +++ b/resources/traffic_scripts/send_icmp_check_multipath.py @@ -18,10 +18,9 @@ and check if it is divided into two paths.""" import sys import ipaddress -from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6 from scapy.all import Ether -from scapy.layers.inet6 import ICMPv6EchoRequest +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -94,27 +93,38 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) + + while True: + ether = rxq.recv(2) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if ether is None: raise RuntimeError("ICMP echo Rx timeout") if not ether.haslayer(ip_format): - raise RuntimeError("Not an IP packet received {0}" - .format(ether.__repr__())) + raise RuntimeError("Not an IP packet received {0}". + format(ether.__repr__())) - if ether['Ethernet'].src != dut_if2_mac: + if ether[Ether].src != dut_if2_mac: raise RuntimeError("Source MAC address error") - if ether['Ethernet'].dst == path_1_mac: + if ether[Ether].dst == path_1_mac: path_1_counter += 1 - elif ether['Ethernet'].dst == path_2_mac: + elif ether[Ether].dst == path_2_mac: path_2_counter += 1 else: raise RuntimeError("Destination MAC address error") if (path_1_counter + path_2_counter) != 100: - raise RuntimeError("Packet loss: recevied only {} packets of 100 " - .format(path_1_counter + path_2_counter)) + raise RuntimeError("Packet loss: recevied only {} packets of 100 ". + format(path_1_counter + path_2_counter)) if path_1_counter == 0: raise RuntimeError("Path 1 error!") @@ -127,5 +137,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_icmp_type_code.py b/resources/traffic_scripts/send_icmp_type_code.py index dffaafff38..2997f91264 100755 --- a/resources/traffic_scripts/send_icmp_type_code.py +++ b/resources/traffic_scripts/send_icmp_type_code.py @@ -19,10 +19,9 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. import sys import ipaddress -from scapy.layers.inet import ICMP, IP from scapy.layers.l2 import Ether -from scapy.layers.inet6 import ICMPv6EchoRequest -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -88,13 +87,13 @@ def main(): pkt_raw = (Ether(src=src_mac, dst=dst_mac) / IP(src=src_ip, dst=dst_ip) / ICMP(code=icmp_code, type=icmp_type)) - ip_format = 'IP' - icmp_format = 'ICMP' + ip_format = IP + icmp_format = ICMP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): pkt_raw = (Ether(src=src_mac, dst=dst_mac) / IPv6(src=src_ip, dst=dst_ip) / ICMPv6EchoRequest(code=icmp_code, type=icmp_type)) - ip_format = 'IPv6' + ip_format = IPv6 icmp_format = 'ICMPv6' else: raise ValueError("IP(s) not in correct format") @@ -103,23 +102,31 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) + while True: + ether = rxq.recv(2) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') - # Check whether received packet contains layers Ether, IP and ICMP - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + # Check whether received packet contains layers IP and ICMP if not ether.haslayer(ip_format): - raise RuntimeError('Not an IP packet received {0}' - .format(ether.__repr__())) + raise RuntimeError('Not an IP packet received {0}'. + format(ether.__repr__())) # Cannot use haslayer for ICMPv6, every type of ICMPv6 is a separate layer # Next header value of 58 means the next header is ICMPv6 if not ether.haslayer(icmp_format) and ether[ip_format].nh != 58: - raise RuntimeError('Not an ICMP packet received {0}' - .format(ether.__repr__())) + raise RuntimeError('Not an ICMP packet received {0}'. + format(ether.__repr__())) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_icmp_wait_for_reply.py b/resources/traffic_scripts/send_icmp_wait_for_reply.py index a77f15efc2..7677152801 100755 --- a/resources/traffic_scripts/send_icmp_wait_for_reply.py +++ b/resources/traffic_scripts/send_icmp_wait_for_reply.py @@ -17,9 +17,9 @@ import sys import ipaddress -from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from scapy.all import Ether +from scapy.layers.inet import ICMP, IP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -35,14 +35,14 @@ def is_icmp_reply(pkt, ipformat): :type ipformat: dict :rtype: bool """ - #pylint: disable=bare-except + # pylint: disable=bare-except try: if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \ ipformat['Type']: return True else: return False - except: + except: # pylint: disable=bare-except return False @@ -58,12 +58,12 @@ def address_check(request, reply, ipformat): :type ipformat: dict :rtype: bool """ - #pylint: disable=bare-except + # pylint: disable=bare-except try: r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src return r_src and r_dst - except: + except: # pylint: disable=bare-except return False @@ -139,12 +139,21 @@ def main(): txq.send(icmp_request) for _ in range(1000): - icmp_reply = rxq.recv(wait_step, ignore=sent_packets) - if icmp_reply is None: - timeout -= wait_step - if timeout < 0: - raise RuntimeError("ICMP echo Rx timeout") - elif is_icmp_reply(icmp_reply, ip_format): + while True: + icmp_reply = rxq.recv(wait_step, ignore=sent_packets) + if icmp_reply is None: + timeout -= wait_step + if timeout < 0: + raise RuntimeError("ICMP echo Rx timeout") + + elif icmp_reply.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break + + if is_icmp_reply(icmp_reply, ip_format): if address_check(icmp_request, icmp_reply, ip_format): break else: @@ -154,5 +163,6 @@ def main(): sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_ip_icmp.py b/resources/traffic_scripts/send_ip_icmp.py index b22b5d39a8..58f2e1e4d8 100755 --- a/resources/traffic_scripts/send_ip_icmp.py +++ b/resources/traffic_scripts/send_ip_icmp.py @@ -19,11 +19,9 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. import sys import ipaddress +from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet import ICMP, IP -from scapy.layers.l2 import Ether -from scapy.layers.l2 import Dot1Q -from scapy.layers.inet6 import ICMPv6EchoRequest -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -107,8 +105,8 @@ def main(): pkt_raw = (Ether(src=src_mac, dst=dst_mac) / IP(src=src_ip, dst=dst_ip) / ICMP()) - ip_format = 'IP' - icmp_format = 'ICMP' + ip_format = IP + icmp_format = ICMP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): if encaps == 'Dot1q': pkt_raw = (Ether(src=src_mac, dst=dst_mac) / @@ -125,8 +123,8 @@ def main(): pkt_raw = (Ether(src=src_mac, dst=dst_mac) / IPv6(src=src_ip, dst=dst_ip) / ICMPv6EchoRequest()) - ip_format = 'IPv6' - icmp_format = 'ICMPv6EchoRequest' + ip_format = IPv6 + icmp_format = ICMPv6EchoRequest else: raise ValueError("IP(s) not in correct format") @@ -134,44 +132,55 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) + # Receive ICMP / ICMPv6 echo reply + while True: + ether = rxq.recv(2,) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') - # Check whether received packet contains layers Ether, IP and ICMP - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + # Check whether received packet contains layers IP/IPv6 and + # ICMP/ICMPv6EchoRequest if encaps_rx: if encaps_rx == 'Dot1q': if not vlan1_rx: vlan1_rx = vlan1 if not ether.haslayer(Dot1Q): - raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}' - .format(ether.__repr__())) + raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'. + format(ether.__repr__())) elif ether[Dot1Q].vlan != int(vlan1_rx): raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) ' - 'received ({} expected):\n{}'.format( - ether[Dot1Q].vlan, vlan1_rx, ether.__repr__())) + 'received ({} expected):\n{}'. + format(ether[Dot1Q].vlan, vlan1_rx, + ether.__repr__())) elif encaps_rx == 'Dot1ad': if not vlan1_rx: vlan1_rx = vlan1 if not vlan2_rx: vlan2_rx = vlan2 # TODO - raise RuntimeError('Encapsulation {0} not implemented yet.' - .format(encaps_rx)) + raise RuntimeError('Encapsulation {0} not implemented yet.'. + format(encaps_rx)) else: - raise RuntimeError('Unsupported/unknown encapsulation expected: {0}' - .format(encaps_rx)) + raise RuntimeError('Unsupported encapsulation expected: {0}'. + format(encaps_rx)) if not ether.haslayer(ip_format): - raise RuntimeError('Not an IP packet received:\n{0}' - .format(ether.__repr__())) + raise RuntimeError('Not an IP/IPv6 packet received:\n{0}'. + format(ether.__repr__())) if not ether.haslayer(icmp_format): - raise RuntimeError('Not an ICMP packet received:\n{0}' - .format(ether.__repr__())) + raise RuntimeError('Not an ICMP/ICMPv6EchoRequest packet received:\n' + '{0}'.format(ether.__repr__())) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_rs_check_ra.py b/resources/traffic_scripts/send_rs_check_ra.py index 9ba1f55b52..c9ff46528a 100755 --- a/resources/traffic_scripts/send_rs_check_ra.py +++ b/resources/traffic_scripts/send_rs_check_ra.py @@ -18,7 +18,7 @@ import sys import ipaddress from scapy.layers.l2 import Ether -from scapy.layers.inet6 import IPv6, ICMPv6ND_RS +from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_RS, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -72,24 +72,29 @@ def main(): sent_packets = [pkt_raw] txq.send(pkt_raw) - ether = rxq.recv(8, ignore=sent_packets) + while True: + ether = rxq.recv(2, sent_packets) + if ether is None: + raise RuntimeError('ICMP echo Rx timeout') - # Check whether received packet contains layer RA and check other values - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + # Check whether received packet contains layer RA and check other values if ether.src != router_mac: - raise RuntimeError( - 'Packet source MAC ({0}) does not match ' - 'router MAC ({1}).'.format(ether.src, router_mac)) + raise RuntimeError('Packet source MAC ({0}) does not match router MAC ' + '({1}).'.format(ether.src, router_mac)) if ether.dst != src_mac: - raise RuntimeError( - 'Packet destination MAC ({0}) does not match ' - 'RS source MAC ({1}).'.format(ether.dst, src_mac)) + raise RuntimeError('Packet destination MAC ({0}) does not match RS ' + 'source MAC ({1}).'.format(ether.dst, src_mac)) - if not ether.haslayer('ICMPv6ND_RA'): - raise RuntimeError('Not an RA packet received {0}' - .format(ether.__repr__())) + if not ether.haslayer(ICMPv6ND_RA): + raise RuntimeError('Not an RA packet received {0}'. + format(ether.__repr__())) src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src)) dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst)) @@ -98,24 +103,25 @@ def main(): rs_src_address = ipaddress.IPv6Address(unicode(src_ip)) if src_address != router_link_local: - raise RuntimeError( - 'Packet source address ({0}) does not match ' - 'link local address({1})'.format(src_address, router_link_local)) + raise RuntimeError('Packet source address ({0}) does not match link ' + 'local address({1})'. + format(src_address, router_link_local)) if dst_address != rs_src_address: - raise RuntimeError( - 'Packet destination address ({0}) does not match ' - 'RS source address ({1}).'.format(dst_address, rs_src_address)) + raise RuntimeError('Packet destination address ({0}) does not match ' + 'RS source address ({1}).'. + format(dst_address, rs_src_address)) if ether['IPv6'].hlim != 255: - raise RuntimeError('Hop limit not correct: {0}!=255'.format( - ether['IPv6'].hlim)) + raise RuntimeError('Hop limit not correct: {0}!=255'. + format(ether['IPv6'].hlim)) - ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code + ra_code = ether[ICMPv6ND_RA].code if ra_code != 0: raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code)) sys.exit(0) + if __name__ == "__main__": main() diff --git a/resources/traffic_scripts/send_tcp_for_classifier_test.py b/resources/traffic_scripts/send_tcp_for_classifier_test.py index 5d8d387767..5a6873ab4e 100755 --- a/resources/traffic_scripts/send_tcp_for_classifier_test.py +++ b/resources/traffic_scripts/send_tcp_for_classifier_test.py @@ -17,27 +17,24 @@ Traffic script that sends an TCP packet from TG to DUT. """ import sys -import time -from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 from scapy.all import Ether, Packet, Raw +from scapy.layers.inet import IP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from resources.libraries.python.SFC.VerifyPacket import * -from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon +from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from robot.api import logger def main(): """Send TCP packet from one traffic generator interface to DUT. :raises: If the IP address is invalid. """ - args = TrafficScriptArg( - ['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'timeout', 'framesize', 'testtype']) + args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', + 'timeout', 'framesize', 'testtype']) src_mac = args.get_arg('src_mac') dst_mac = args.get_arg('dst_mac') @@ -54,10 +51,9 @@ def main(): sent_packets = [] protocol = TCP - source_port = sfccon.DEF_SRC_PORT - destination_port = sfccon.DEF_DST_PORT + source_port = SfcCon.DEF_SRC_PORT + destination_port = SfcCon.DEF_DST_PORT - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -66,8 +62,8 @@ def main(): raise ValueError("Invalid IP version!") pkt_header = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - protocol(sport=int(source_port), dport=int(destination_port))) + ip_version(src=src_ip, dst=dst_ip) / + protocol(sport=int(source_port), dport=int(destination_port))) fsize_no_fcs = frame_size - 4 pad_len = max(0, fsize_no_fcs - len(pkt_header)) @@ -79,10 +75,17 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(timeout) - - if ether is None: - raise RuntimeError("No packet is received!") + while True: + ether = rxq.recv(timeout) + if ether is None: + raise RuntimeError('No packet is received!') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break # let us begin to check the NSH SFC loopback packet VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type) diff --git a/resources/traffic_scripts/send_tcp_udp.py b/resources/traffic_scripts/send_tcp_udp.py index 77f918213f..4cba73286a 100755 --- a/resources/traffic_scripts/send_tcp_udp.py +++ b/resources/traffic_scripts/send_tcp_udp.py @@ -19,9 +19,9 @@ from one interface to the other. import sys import ipaddress -from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 from scapy.all import Ether +from scapy.layers.inet import IP, UDP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -62,9 +62,8 @@ def valid_ipv6(ip): def main(): """Send TCP or UDP packet from one traffic generator interface to the other. """ - args = TrafficScriptArg( - ['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol', - 'source_port', 'destination_port']) + args = TrafficScriptArg(['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol', + 'source_port', 'destination_port']) src_mac = args.get_arg('tx_mac') dst_mac = args.get_arg('rx_mac') @@ -77,7 +76,6 @@ def main(): source_port = args.get_arg('source_port') destination_port = args.get_arg('destination_port') - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -90,7 +88,7 @@ def main(): elif protocol.upper() == 'UDP': protocol = UDP else: - raise ValueError("Invalid type of protocol!") + raise ValueError("Invalid protocol type!") rxq = RxQueue(rx_if) txq = TxQueue(tx_if) @@ -100,19 +98,27 @@ def main(): protocol(sport=int(source_port), dport=int(destination_port))) txq.send(pkt_raw) - ether = rxq.recv(2) - if ether is None: - raise RuntimeError("TCP/UDP Rx timeout") + while True: + ether = rxq.recv(2) + if ether is None: + raise RuntimeError('TCP/UDP Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break - if 'TCP' in ether: + if TCP in ether: print ("TCP packet received.") - elif 'UDP' in ether: + elif UDP in ether: print ("UDP packet received.") else: - raise RuntimeError("Not an TCP or UDP packet received {0}" - .format(ether.__repr__())) + raise RuntimeError("Not an TCP or UDP packet received {0}". + format(ether.__repr__())) sys.exit(0) diff --git a/resources/traffic_scripts/send_vxlan_for_proxy_test.py b/resources/traffic_scripts/send_vxlan_for_proxy_test.py index c356e1977e..d33ed413c8 100755 --- a/resources/traffic_scripts/send_vxlan_for_proxy_test.py +++ b/resources/traffic_scripts/send_vxlan_for_proxy_test.py @@ -19,24 +19,22 @@ import sys import time from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from scapy.all import Ether, Packet, Raw from resources.libraries.python.SFC.VerifyPacket import * -from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon +from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from robot.api import logger def main(): """Send VxLAN packet from TG to DUT. :raises: If the IP address is invalid. """ - args = TrafficScriptArg( - ['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'timeout', 'framesize', 'testtype']) + args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', + 'timeout', 'framesize', 'testtype']) src_mac = args.get_arg('src_mac') dst_mac = args.get_arg('dst_mac') @@ -44,7 +42,7 @@ def main(): dst_ip = args.get_arg('dst_ip') tx_if = args.get_arg('tx_if') rx_if = args.get_arg('rx_if') - timeout = int(args.get_arg('timeout')) + timeout = max(2, int(args.get_arg('timeout'))) frame_size = int(args.get_arg('framesize')) test_type = args.get_arg('testtype') @@ -53,10 +51,9 @@ def main(): sent_packets = [] protocol = TCP - source_port = sfccon.DEF_SRC_PORT - destination_port = sfccon.DEF_DST_PORT + source_port = SfcCon.DEF_SRC_PORT + destination_port = SfcCon.DEF_DST_PORT - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -65,17 +62,17 @@ def main(): raise ValueError("Invalid IP version!") innerpkt = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - protocol(sport=int(source_port), dport=int(destination_port))) + ip_version(src=src_ip, dst=dst_ip) / + protocol(sport=int(source_port), dport=int(destination_port))) vxlan = '\x08\x00\x00\x00\x00\x00\x01\x00' raw_data = vxlan + str(innerpkt) pkt_header = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - UDP(sport=int(source_port), dport=4789) / - Raw(load=raw_data)) + ip_version(src=src_ip, dst=dst_ip) / + UDP(sport=int(source_port), dport=4789) / + Raw(load=raw_data)) fsize_no_fcs = frame_size - 4 pad_len = max(0, fsize_no_fcs - len(pkt_header)) @@ -87,10 +84,17 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError("No packet is received!") + while True: + ether = rxq.recv(timeout) + if ether is None: + raise RuntimeError('No packet is received!') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break # let us begin to check the proxy outbound packet VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type) diff --git a/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py b/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py index d998d7c6a0..3ea1f0bc62 100755 --- a/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py +++ b/resources/traffic_scripts/send_vxlangpe_nsh_for_proxy_test.py @@ -16,27 +16,24 @@ from TG to DUT. """ import sys -import time from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from scapy.all import Ether, Packet, Raw from resources.libraries.python.SFC.VerifyPacket import * -from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon +from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from robot.api import logger def main(): """Send VxLAN-GPE+NSH packet from TG to DUT. :raises: If the IP address is invalid. """ - args = TrafficScriptArg( - ['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'timeout', 'framesize', 'testtype']) + args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', + 'timeout', 'framesize', 'testtype']) src_mac = args.get_arg('src_mac') dst_mac = args.get_arg('dst_mac') @@ -44,7 +41,7 @@ def main(): dst_ip = args.get_arg('dst_ip') tx_if = args.get_arg('tx_if') rx_if = args.get_arg('rx_if') - timeout = int(args.get_arg('timeout')) + timeout = max(2, int(args.get_arg('timeout'))) frame_size = int(args.get_arg('framesize')) test_type = args.get_arg('testtype') @@ -53,10 +50,9 @@ def main(): sent_packets = [] protocol = TCP - source_port = sfccon.DEF_SRC_PORT - destination_port = sfccon.DEF_DST_PORT + source_port = SfcCon.DEF_SRC_PORT + destination_port = SfcCon.DEF_DST_PORT - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -65,8 +61,8 @@ def main(): raise ValueError("Invalid IP version!") innerpkt = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - protocol(sport=int(source_port), dport=int(destination_port))) + ip_version(src=src_ip, dst=dst_ip) / + protocol(sport=int(source_port), dport=int(destination_port))) vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x09\x00\x00\x06' \ '\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \ @@ -75,9 +71,9 @@ def main(): raw_data = vxlangpe_nsh + str(innerpkt) pkt_header = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - UDP(sport=int(source_port), dport=4790) / - Raw(load=raw_data)) + ip_version(src=src_ip, dst=dst_ip) / + UDP(sport=int(source_port), dport=4790) / + Raw(load=raw_data)) fsize_no_fcs = frame_size - 4 pad_len = max(0, fsize_no_fcs - len(pkt_header)) @@ -89,10 +85,17 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError("No packet is received!") + while True: + ether = rxq.recv(timeout) + if ether is None: + raise RuntimeError('No packet is received!') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break # let us begin to check the proxy inbound packet VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type) diff --git a/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py b/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py index 55c06f0691..6879d20f2c 100755 --- a/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py +++ b/resources/traffic_scripts/send_vxlangpe_nsh_for_sff_test.py @@ -16,27 +16,24 @@ from TG to DUT. """ import sys -import time from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from scapy.all import Ether, Packet, Raw from resources.libraries.python.SFC.VerifyPacket import * -from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon +from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from robot.api import logger def main(): """Send VxLAN-GPE+NSH packet from TG to DUT. :raises: If the IP address is invalid. """ - args = TrafficScriptArg( - ['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'timeout', 'framesize', 'testtype']) + args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', + 'timeout', 'framesize', 'testtype']) src_mac = args.get_arg('src_mac') dst_mac = args.get_arg('dst_mac') @@ -44,7 +41,7 @@ def main(): dst_ip = args.get_arg('dst_ip') tx_if = args.get_arg('tx_if') rx_if = args.get_arg('rx_if') - timeout = int(args.get_arg('timeout')) + timeout = max(2, int(args.get_arg('timeout'))) frame_size = int(args.get_arg('framesize')) test_type = args.get_arg('testtype') @@ -53,10 +50,9 @@ def main(): sent_packets = [] protocol = TCP - source_port = sfccon.DEF_SRC_PORT - destination_port = sfccon.DEF_DST_PORT + source_port = SfcCon.DEF_SRC_PORT + destination_port = SfcCon.DEF_DST_PORT - ip_version = None if valid_ipv4(src_ip) and valid_ipv4(dst_ip): ip_version = IP elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): @@ -65,8 +61,8 @@ def main(): raise ValueError("Invalid IP version!") innerpkt = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - protocol(sport=int(source_port), dport=int(destination_port))) + ip_version(src=src_ip, dst=dst_ip) / + protocol(sport=int(source_port), dport=int(destination_port))) vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x0a\x00\x00\x06' \ '\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \ @@ -75,9 +71,9 @@ def main(): raw_data = vxlangpe_nsh + str(innerpkt) pkt_header = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - UDP(sport=int(source_port), dport=4790) / - Raw(load=raw_data)) + ip_version(src=src_ip, dst=dst_ip) / + UDP(sport=int(source_port), dport=4790) / + Raw(load=raw_data)) fsize_no_fcs = frame_size - 4 pad_len = max(0, fsize_no_fcs - len(pkt_header)) @@ -89,10 +85,17 @@ def main(): sent_packets.append(pkt_raw) txq.send(pkt_raw) - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError("No packet is received!") + while True: + ether = rxq.recv(timeout) + if ether is None: + raise RuntimeError('No packet is received!') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break # let us begin to check the sfc sff packet VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type) diff --git a/resources/traffic_scripts/span_check.py b/resources/traffic_scripts/span_check.py index c7ca8a7374..cbf65d3aaf 100755 --- a/resources/traffic_scripts/span_check.py +++ b/resources/traffic_scripts/span_check.py @@ -21,7 +21,7 @@ import sys import ipaddress from scapy.layers.inet import IP, ICMP, ARP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS from scapy.layers.l2 import Ether from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad @@ -64,8 +64,8 @@ def main(): """Send a simple L2 or ICMP packet from one TG interface to DUT, then receive a copy of the packet on the second TG interface, and a copy of the ICMP reply.""" - args = TrafficScriptArg( - ['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', 'ptype']) + args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', + 'ptype']) src_mac = args.get_arg('tg_src_mac') dst_mac = args.get_arg('dut_if1_mac') @@ -104,11 +104,20 @@ def main(): txq.send(pkt_raw) sent.append(auto_pad(pkt_raw)) - ether = rxq_mirrored.recv(2) # Receive copy of Rx packet. - if ether is None: - raise RuntimeError("Rx timeout of mirrored Rx packet") + while True: + ether = rxq_mirrored.recv(2) + if ether is None: + raise RuntimeError("Rx timeout of mirrored Rx packet") + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break + pkt = auto_pad(pkt_raw) if str(ether) != str(pkt): print("Mirrored Rx packet doesn't match the original Rx packet.") @@ -148,15 +157,17 @@ def main(): # Receive reply on TG Tx port. ether_repl = rxq_tx.recv(2, sent) + if ether_repl is None: raise RuntimeError("Reply not received on TG Tx port.") - else: - print("Reply received on TG Tx port.\n") + + print("Reply received on TG Tx port.\n") # Receive copy of Tx packet. ether = rxq_mirrored.recv(2) if ether is None: raise RuntimeError("Rx timeout of mirrored Tx packet") + if str(ether) != str(ether_repl): print("Mirrored Tx packet doesn't match the received Tx packet.") if ether.src != ether_repl.src or ether.dst != ether_repl.dst: |