diff options
author | Jan Gelety <jgelety@cisco.com> | 2017-09-08 11:38:38 +0200 |
---|---|---|
committer | Tibor Frank <tifrank@cisco.com> | 2017-09-18 12:05:49 +0000 |
commit | 2a848f49308868dfe6fa3a9cb78bd085f8c16f40 (patch) | |
tree | 180c45ea5db2cc095c65d3b698a3e05a6ee819fe /resources/traffic_scripts/ipv6_nd_proxy_check.py | |
parent | 6928a6be42016a6c5edade6369041670fe544f39 (diff) |
Ignore unexpected ICMPv6 Neighbor Discovery - Neighbor Solicitation packets
We need to adapt all functional traffic scripts related to functional
IPv6 tests to ingore receiving of unexpected ICMPv6ND_NS
(ICMPv6 Neighbor Discovery - Neighbor Solicitation) packets that are
sent automatically and we cannot avoid to receive them.
The reason is to prevent false negative test results in case of csit
functional tests that could block creation of new operational branch
(csit weekly jobs), usage of new vpp builds (csit semiweekly jobs)
and merging patches - csit as well as vpp.
Change-Id: I43c90e7c766762fa769a81661338759a11b401a1
Signed-off-by: Jan Gelety <jgelety@cisco.com>
Diffstat (limited to 'resources/traffic_scripts/ipv6_nd_proxy_check.py')
-rwxr-xr-x | resources/traffic_scripts/ipv6_nd_proxy_check.py | 89 |
1 files changed, 54 insertions, 35 deletions
diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py index b1028028b0..c9213999ec 100755 --- a/resources/traffic_scripts/ipv6_nd_proxy_check.py +++ b/resources/traffic_scripts/ipv6_nd_proxy_check.py @@ -15,8 +15,9 @@ """Traffic script that sends DHCPv6 proxy packets.""" from scapy.layers.inet import Ether -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\ - ICMPv6ND_NS +from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -44,17 +45,23 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): sent_packets = [] - icmpv6nd_solicit_pkt =\ - Ether(src=src_mac, dst=dst_mac) / \ - IPv6(src=src_ip) / \ - ICMPv6ND_NS(tgt=dst_ip) + icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) / + IPv6(src=src_ip) / + ICMPv6ND_NS(tgt=dst_ip)) sent_packets.append(icmpv6nd_solicit_pkt) txq.send(icmpv6nd_solicit_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3, ignore=sent_packets) + while True: + pkt = rxq.recv(3, ignore=sent_packets) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -63,34 +70,33 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): raise RuntimeError('ICMPv6ND Proxy response timeout.') if ether.src != dst_mac: - raise RuntimeError("Source MAC address error: {} != {}".format( - ether.src, dst_mac)) + raise RuntimeError("Source MAC address error: {} != {}". + format(ether.src, dst_mac)) print "Source MAC address: OK." if ether.dst != src_mac: - raise RuntimeError("Destination MAC address error: {} != {}".format( - ether.dst, src_mac)) + raise RuntimeError("Destination MAC address error: {} != {}". + format(ether.dst, src_mac)) print "Destination MAC address: OK." - if ether['IPv6'].src != dst_ip: - raise RuntimeError("Source IP address error: {} != {}".format( - ether['IPv6'].src, dst_ip)) + if ether[IPv6].src != dst_ip: + raise RuntimeError("Source IP address error: {} != {}". + format(ether[IPv6].src, dst_ip)) print "Source IP address: OK." - if ether['IPv6'].dst != src_ip: - raise RuntimeError("Destination IP address error: {} != {}".format( - ether['IPv6'].dst, src_ip)) + if ether[IPv6].dst != src_ip: + raise RuntimeError("Destination IP address error: {} != {}". + format(ether[IPv6].dst, src_ip)) print "Destination IP address: OK." try: - target_addr = ether['IPv6']\ - ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt + target_addr = ether[IPv6][ICMPv6ND_NA].tgt except (KeyError, AttributeError): raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.") if target_addr != dst_ip: - raise RuntimeError("ICMPv6 field 'Target address' error:" - " {} != {}".format(target_addr, dst_ip)) + raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}". + format(target_addr, dst_ip)) print "Target address field: OK." @@ -119,16 +125,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, rxq = RxQueue(dst_if) txq = TxQueue(src_if) - icmpv6_ping_pkt = \ - Ether(src=src_mac, dst=proxy_to_src_mac) / \ - IPv6(src=src_ip, dst=dst_ip) / \ - ICMPv6EchoRequest() + icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) / + IPv6(src=src_ip, dst=dst_ip) / + ICMPv6EchoRequest()) txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3) + while True: + pkt = rxq.recv(3) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -136,7 +148,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, if ether is None: raise RuntimeError('ICMPv6 Echo Request timeout.') try: - ether["IPv6"]["ICMPv6 Echo Request"] + ether[IPv6]["ICMPv6 Echo Request"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Request.") print "ICMP Echo: OK." @@ -144,16 +156,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, rxq = RxQueue(src_if) txq = TxQueue(dst_if) - icmpv6_ping_pkt = \ - Ether(src=dst_mac, dst=proxy_to_dst_mac) / \ - IPv6(src=dst_ip, dst=src_ip) / \ - ICMPv6EchoReply() + icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) / + IPv6(src=dst_ip, dst=src_ip) / + ICMPv6EchoReply()) txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): - pkt = rxq.recv(3) + while True: + pkt = rxq.recv(3) + if ether.haslayer(ICMPv6ND_NS): + # read another packet in the queue in case of ICMPv6ND_NS packet + continue + else: + # otherwise process the current packet + break if pkt is not None: ether = pkt break @@ -161,7 +179,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac, if ether is None: raise RuntimeError('DHCPv6 SOLICIT timeout') try: - ether["IPv6"]["ICMPv6 Echo Reply"] + ether[IPv6]["ICMPv6 Echo Reply"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.") @@ -187,8 +205,9 @@ def main(): imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip) # Verify route (ICMP echo/reply) - ipv6_ping(src_if, dst_if, src_mac, dst_mac, - proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip) + ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac, + proxy_to_dst_mac, src_ip, dst_ip) + if __name__ == "__main__": main() |