diff options
Diffstat (limited to 'resources/traffic_scripts')
-rwxr-xr-x | resources/traffic_scripts/ipv6_nd_proxy_check.py | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py new file mode 100755 index 0000000000..b1028028b0 --- /dev/null +++ b/resources/traffic_scripts/ipv6_nd_proxy_check.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python +# Copyright (c) 2016 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traffic script that sends DHCPv6 proxy packets.""" + +from scapy.layers.inet import Ether +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\ + ICMPv6ND_NS + +from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg + + +def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): + """Send ICMPv6 Neighbor Solicitation packet and expect a response + from the proxy. + + :param tx_if: Interface on TG. + :param src_mac: MAC address of TG interface. + :param dst_mac: MAC address of proxy interface. + :param src_ip: IP address of TG interface. + :param dst_ip: IP address of proxied interface. + :type tx_if: str + :type src_mac: str + :type dst_mac: str + :type src_ip: str + :type dst_ip: str + :raises RuntimeError: If the received packet is not correct. + """ + + rxq = RxQueue(tx_if) + txq = TxQueue(tx_if) + + sent_packets = [] + + icmpv6nd_solicit_pkt =\ + Ether(src=src_mac, dst=dst_mac) / \ + IPv6(src=src_ip) / \ + ICMPv6ND_NS(tgt=dst_ip) + + sent_packets.append(icmpv6nd_solicit_pkt) + txq.send(icmpv6nd_solicit_pkt) + + ether = None + for _ in range(5): + pkt = rxq.recv(3, ignore=sent_packets) + if pkt is not None: + ether = pkt + break + + if ether is None: + raise RuntimeError('ICMPv6ND Proxy response timeout.') + + if ether.src != dst_mac: + raise RuntimeError("Source MAC address error: {} != {}".format( + ether.src, dst_mac)) + print "Source MAC address: OK." + + if ether.dst != src_mac: + raise RuntimeError("Destination MAC address error: {} != {}".format( + ether.dst, src_mac)) + print "Destination MAC address: OK." + + if ether['IPv6'].src != dst_ip: + raise RuntimeError("Source IP address error: {} != {}".format( + ether['IPv6'].src, dst_ip)) + print "Source IP address: OK." + + if ether['IPv6'].dst != src_ip: + raise RuntimeError("Destination IP address error: {} != {}".format( + ether['IPv6'].dst, src_ip)) + print "Destination IP address: OK." + + try: + target_addr = ether['IPv6']\ + ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt + except (KeyError, AttributeError): + raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.") + + if target_addr != dst_ip: + raise RuntimeError("ICMPv6 field 'Target address' error:" + " {} != {}".format(target_addr, dst_ip)) + print "Target address field: OK." + + +def ipv6_ping(src_if, dst_if, src_mac, dst_mac, + proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip): + """Sends ICMPv6 Echo Request, receive it and send a reply. + + :param src_if: First TG interface on link to DUT. + :param dst_if: Second TG interface on link to DUT. + :param src_mac: MAC address of first interface. + :param dst_mac: MAC address of second interface. + :param proxy_to_src_mac: MAC address of first proxy interface on DUT. + :param proxy_to_dst_mac: MAC address of second proxy interface on DUT. + :param src_ip: IP address of first interface. + :param dst_ip: IP address of second interface. + :type src_if: str + :type dst_if: str + :type src_mac: str + :type dst_mac: str + :type proxy_to_src_mac: str + :type proxy_to_dst_mac: str + :type src_ip: str + :type dst_ip: str + :raises RuntimeError: If a received packet is not correct. + """ + rxq = RxQueue(dst_if) + txq = TxQueue(src_if) + + icmpv6_ping_pkt = \ + Ether(src=src_mac, dst=proxy_to_src_mac) / \ + IPv6(src=src_ip, dst=dst_ip) / \ + ICMPv6EchoRequest() + + txq.send(icmpv6_ping_pkt) + + ether = None + for _ in range(5): + pkt = rxq.recv(3) + if pkt is not None: + ether = pkt + break + + if ether is None: + raise RuntimeError('ICMPv6 Echo Request timeout.') + try: + ether["IPv6"]["ICMPv6 Echo Request"] + except KeyError: + raise RuntimeError("Received packet is not an ICMPv6 Echo Request.") + print "ICMP Echo: OK." + + rxq = RxQueue(src_if) + txq = TxQueue(dst_if) + + icmpv6_ping_pkt = \ + Ether(src=dst_mac, dst=proxy_to_dst_mac) / \ + IPv6(src=dst_ip, dst=src_ip) / \ + ICMPv6EchoReply() + + txq.send(icmpv6_ping_pkt) + + ether = None + for _ in range(5): + pkt = rxq.recv(3) + if pkt is not None: + ether = pkt + break + + if ether is None: + raise RuntimeError('DHCPv6 SOLICIT timeout') + try: + ether["IPv6"]["ICMPv6 Echo Reply"] + except KeyError: + raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.") + + print "ICMP Reply: OK." + + +def main(): + """Send DHCPv6 proxy messages.""" + + args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac', + 'proxy_to_src_mac', 'proxy_to_dst_mac']) + + src_if = args.get_arg('tx_if') + dst_if = args.get_arg('rx_if') + src_ip = args.get_arg("src_ip") + dst_ip = args.get_arg("dst_ip") + src_mac = args.get_arg('src_mac') + dst_mac = args.get_arg('dst_mac') + proxy_to_src_mac = args.get_arg('proxy_to_src_mac') + proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac') + + # Neighbor solicitation + imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip) + + # Verify route (ICMP echo/reply) + ipv6_ping(src_if, dst_if, src_mac, dst_mac, + proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip) + +if __name__ == "__main__": + main() |