From 2a848f49308868dfe6fa3a9cb78bd085f8c16f40 Mon Sep 17 00:00:00 2001 From: Jan Gelety Date: Fri, 8 Sep 2017 11:38:38 +0200 Subject: Ignore unexpected ICMPv6 Neighbor Discovery - Neighbor Solicitation packets We need to adapt all functional traffic scripts related to functional IPv6 tests to ingore receiving of unexpected ICMPv6ND_NS (ICMPv6 Neighbor Discovery - Neighbor Solicitation) packets that are sent automatically and we cannot avoid to receive them. The reason is to prevent false negative test results in case of csit functional tests that could block creation of new operational branch (csit weekly jobs), usage of new vpp builds (csit semiweekly jobs) and merging patches - csit as well as vpp. Change-Id: I43c90e7c766762fa769a81661338759a11b401a1 Signed-off-by: Jan Gelety --- resources/traffic_scripts/icmpv6_echo_req_resp.py | 119 +++++++++++++--------- 1 file changed, 69 insertions(+), 50 deletions(-) (limited to 'resources/traffic_scripts/icmpv6_echo_req_resp.py') diff --git a/resources/traffic_scripts/icmpv6_echo_req_resp.py b/resources/traffic_scripts/icmpv6_echo_req_resp.py index c2cc4d20a0..ec9cf94a67 100755 --- a/resources/traffic_scripts/icmpv6_echo_req_resp.py +++ b/resources/traffic_scripts/icmpv6_echo_req_resp.py @@ -18,14 +18,20 @@ import sys import logging + +# pylint: disable=no-name-in-module +# pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\ - checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr + +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply from scapy.all import Ether +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.PacketVerifier import checksum_equal +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg + def main(): args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac', @@ -52,109 +58,122 @@ def main(): # send ICMPv6 neighbor advertisement message pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=src_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=src_mac)) + IPv6(src=src_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NA(tgt=src_ip, R=0) / + ICMPv6NDOptDstLLAddr(lladdr=src_mac)) src_sent_packets.append(pkt_send) src_txq.send(pkt_send) pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=dst_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=dst_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=dst_mac)) + IPv6(src=dst_ip, dst='ff02::1:ff00:2') / + ICMPv6ND_NA(tgt=dst_ip, R=0) / + ICMPv6NDOptDstLLAddr(lladdr=dst_mac)) dst_sent_packets.append(pkt_send) dst_txq.send(pkt_send) # send ICMPv6 echo request from first TG interface pkt_send = (Ether(src=src_mac, dst=src_nh_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) / - ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) + IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) / + ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) src_sent_packets.append(pkt_send) src_txq.send(pkt_send) # receive ICMPv6 echo request on second TG interface - ether = dst_rxq.recv(2, dst_sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') + while True: + ether = dst_rxq.recv(2, dst_sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 received: {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] # verify hop limit processing if ipv6.hlim != (hop_limit - hop_num): - raise RuntimeError( - 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim, - hop_limit - hop_num)) + raise RuntimeError('Invalid hop limit {0} should be {1}'. + format(ipv6.hlim,hop_limit - hop_num)) if not ipv6.haslayer(ICMPv6EchoRequest): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'. + format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Request'] + icmpv6 = ipv6[ICMPv6EchoRequest] # check identifier and sequence number if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \ - 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) + raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} ' + 'seq {1} should be ID {2} seq {3}'. + format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) # verify checksum cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoRequest(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) # send ICMPv6 echo reply from second TG interface pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) / - IPv6(src=dst_ip, dst=src_ip) / - ICMPv6EchoReply(id=echo_id, seq=echo_seq)) + IPv6(src=dst_ip, dst=src_ip) / + ICMPv6EchoReply(id=echo_id, seq=echo_seq)) dst_sent_packets.append(pkt_send) dst_txq.send(pkt_send) # receive ICMPv6 echo reply on first TG interface - ether = src_rxq.recv(2, src_sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') + while True: + ether = src_rxq.recv(2, src_sent_packets) + if ether is None: + raise RuntimeError('ICMPv6 echo reply Rx timeout') + + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue if the current one is ICMPv6ND_NS + continue + else: + # otherwise process the current packet + break if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format( - ether.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'. + format(ether.__repr__())) - ipv6 = ether['IPv6'] + ipv6 = ether[IPv6] # verify hop limit processing if ipv6.hlim != (hop_limit - hop_num): - raise RuntimeError( - 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim, - hop_limit - hop_num)) + raise RuntimeError('Invalid hop limit {0} should be {1}'. + format(ipv6.hlim, hop_limit - hop_num)) if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError( - 'Unexpected packet with no IPv6 ICMP received {0}'.format( - ipv6.__repr__())) + raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'. + format(ipv6.__repr__())) - icmpv6 = ipv6['ICMPv6 Echo Reply'] + icmpv6 = ipv6[ICMPv6EchoReply] # check identifier and sequence number if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \ - 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) + raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} ' + 'seq {1} should be ID {2} seq {3}'. + format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) # verify checksum cksum = icmpv6.cksum del icmpv6.cksum tmp = ICMPv6EchoReply(str(icmpv6)) if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum)) + raise RuntimeError('Invalid checksum {0} should be {1}'. + format(cksum, tmp.cksum)) sys.exit(0) + if __name__ == "__main__": main() -- cgit 1.2.3-korg