aboutsummaryrefslogtreecommitdiffstats
path: root/resources/traffic_scripts
diff options
context:
space:
mode:
authorselias <samelias@cisco.com>2017-03-16 15:28:59 +0100
committerTibor Frank <tifrank@cisco.com>2017-03-30 11:13:12 +0000
commitd919da731b92e02ccc9cee207f9d138876b7b08e (patch)
tree90a82ef2a43e0ec91c77d9476a874c9620288c51 /resources/traffic_scripts
parentbd680a762d945b7970f7e62b7ee47776f0301f8c (diff)
CSIT-532 HC Test: IPv6 Neighbor Discovery proxy
Change-Id: I6495726a814e116191f7a183be71c661466e7053 Signed-off-by: selias <samelias@cisco.com>
Diffstat (limited to 'resources/traffic_scripts')
-rwxr-xr-xresources/traffic_scripts/ipv6_nd_proxy_check.py194
1 files changed, 194 insertions, 0 deletions
diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py
new file mode 100755
index 0000000000..b1028028b0
--- /dev/null
+++ b/resources/traffic_scripts/ipv6_nd_proxy_check.py
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+# Copyright (c) 2016 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Traffic script that sends DHCPv6 proxy packets."""
+
+from scapy.layers.inet import Ether
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
+ ICMPv6ND_NS
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
+
+
+def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
+ """Send ICMPv6 Neighbor Solicitation packet and expect a response
+ from the proxy.
+
+ :param tx_if: Interface on TG.
+ :param src_mac: MAC address of TG interface.
+ :param dst_mac: MAC address of proxy interface.
+ :param src_ip: IP address of TG interface.
+ :param dst_ip: IP address of proxied interface.
+ :type tx_if: str
+ :type src_mac: str
+ :type dst_mac: str
+ :type src_ip: str
+ :type dst_ip: str
+ :raises RuntimeError: If the received packet is not correct.
+ """
+
+ rxq = RxQueue(tx_if)
+ txq = TxQueue(tx_if)
+
+ sent_packets = []
+
+ icmpv6nd_solicit_pkt =\
+ Ether(src=src_mac, dst=dst_mac) / \
+ IPv6(src=src_ip) / \
+ ICMPv6ND_NS(tgt=dst_ip)
+
+ sent_packets.append(icmpv6nd_solicit_pkt)
+ txq.send(icmpv6nd_solicit_pkt)
+
+ ether = None
+ for _ in range(5):
+ pkt = rxq.recv(3, ignore=sent_packets)
+ if pkt is not None:
+ ether = pkt
+ break
+
+ if ether is None:
+ raise RuntimeError('ICMPv6ND Proxy response timeout.')
+
+ if ether.src != dst_mac:
+ raise RuntimeError("Source MAC address error: {} != {}".format(
+ ether.src, dst_mac))
+ print "Source MAC address: OK."
+
+ if ether.dst != src_mac:
+ raise RuntimeError("Destination MAC address error: {} != {}".format(
+ ether.dst, src_mac))
+ print "Destination MAC address: OK."
+
+ if ether['IPv6'].src != dst_ip:
+ raise RuntimeError("Source IP address error: {} != {}".format(
+ ether['IPv6'].src, dst_ip))
+ print "Source IP address: OK."
+
+ if ether['IPv6'].dst != src_ip:
+ raise RuntimeError("Destination IP address error: {} != {}".format(
+ ether['IPv6'].dst, src_ip))
+ print "Destination IP address: OK."
+
+ try:
+ target_addr = ether['IPv6']\
+ ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt
+ except (KeyError, AttributeError):
+ raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
+
+ if target_addr != dst_ip:
+ raise RuntimeError("ICMPv6 field 'Target address' error:"
+ " {} != {}".format(target_addr, dst_ip))
+ print "Target address field: OK."
+
+
+def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
+ proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip):
+ """Sends ICMPv6 Echo Request, receive it and send a reply.
+
+ :param src_if: First TG interface on link to DUT.
+ :param dst_if: Second TG interface on link to DUT.
+ :param src_mac: MAC address of first interface.
+ :param dst_mac: MAC address of second interface.
+ :param proxy_to_src_mac: MAC address of first proxy interface on DUT.
+ :param proxy_to_dst_mac: MAC address of second proxy interface on DUT.
+ :param src_ip: IP address of first interface.
+ :param dst_ip: IP address of second interface.
+ :type src_if: str
+ :type dst_if: str
+ :type src_mac: str
+ :type dst_mac: str
+ :type proxy_to_src_mac: str
+ :type proxy_to_dst_mac: str
+ :type src_ip: str
+ :type dst_ip: str
+ :raises RuntimeError: If a received packet is not correct.
+ """
+ rxq = RxQueue(dst_if)
+ txq = TxQueue(src_if)
+
+ icmpv6_ping_pkt = \
+ Ether(src=src_mac, dst=proxy_to_src_mac) / \
+ IPv6(src=src_ip, dst=dst_ip) / \
+ ICMPv6EchoRequest()
+
+ txq.send(icmpv6_ping_pkt)
+
+ ether = None
+ for _ in range(5):
+ pkt = rxq.recv(3)
+ if pkt is not None:
+ ether = pkt
+ break
+
+ if ether is None:
+ raise RuntimeError('ICMPv6 Echo Request timeout.')
+ try:
+ ether["IPv6"]["ICMPv6 Echo Request"]
+ except KeyError:
+ raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
+ print "ICMP Echo: OK."
+
+ rxq = RxQueue(src_if)
+ txq = TxQueue(dst_if)
+
+ icmpv6_ping_pkt = \
+ Ether(src=dst_mac, dst=proxy_to_dst_mac) / \
+ IPv6(src=dst_ip, dst=src_ip) / \
+ ICMPv6EchoReply()
+
+ txq.send(icmpv6_ping_pkt)
+
+ ether = None
+ for _ in range(5):
+ pkt = rxq.recv(3)
+ if pkt is not None:
+ ether = pkt
+ break
+
+ if ether is None:
+ raise RuntimeError('DHCPv6 SOLICIT timeout')
+ try:
+ ether["IPv6"]["ICMPv6 Echo Reply"]
+ except KeyError:
+ raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
+
+ print "ICMP Reply: OK."
+
+
+def main():
+ """Send DHCPv6 proxy messages."""
+
+ args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac',
+ 'proxy_to_src_mac', 'proxy_to_dst_mac'])
+
+ src_if = args.get_arg('tx_if')
+ dst_if = args.get_arg('rx_if')
+ src_ip = args.get_arg("src_ip")
+ dst_ip = args.get_arg("dst_ip")
+ src_mac = args.get_arg('src_mac')
+ dst_mac = args.get_arg('dst_mac')
+ proxy_to_src_mac = args.get_arg('proxy_to_src_mac')
+ proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac')
+
+ # Neighbor solicitation
+ imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
+
+ # Verify route (ICMP echo/reply)
+ ipv6_ping(src_if, dst_if, src_mac, dst_mac,
+ proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip)
+
+if __name__ == "__main__":
+ main()