aboutsummaryrefslogtreecommitdiffstats
path: root/resources/traffic_scripts/ipv6_nd_proxy_check.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/traffic_scripts/ipv6_nd_proxy_check.py')
-rwxr-xr-xresources/traffic_scripts/ipv6_nd_proxy_check.py89
1 files changed, 54 insertions, 35 deletions
diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py
index b1028028b0..c9213999ec 100755
--- a/resources/traffic_scripts/ipv6_nd_proxy_check.py
+++ b/resources/traffic_scripts/ipv6_nd_proxy_check.py
@@ -15,8 +15,9 @@
"""Traffic script that sends DHCPv6 proxy packets."""
from scapy.layers.inet import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
- ICMPv6ND_NS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -44,17 +45,23 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
sent_packets = []
- icmpv6nd_solicit_pkt =\
- Ether(src=src_mac, dst=dst_mac) / \
- IPv6(src=src_ip) / \
- ICMPv6ND_NS(tgt=dst_ip)
+ icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) /
+ IPv6(src=src_ip) /
+ ICMPv6ND_NS(tgt=dst_ip))
sent_packets.append(icmpv6nd_solicit_pkt)
txq.send(icmpv6nd_solicit_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3, ignore=sent_packets)
+ while True:
+ pkt = rxq.recv(3, ignore=sent_packets)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
@@ -63,34 +70,33 @@ def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
raise RuntimeError('ICMPv6ND Proxy response timeout.')
if ether.src != dst_mac:
- raise RuntimeError("Source MAC address error: {} != {}".format(
- ether.src, dst_mac))
+ raise RuntimeError("Source MAC address error: {} != {}".
+ format(ether.src, dst_mac))
print "Source MAC address: OK."
if ether.dst != src_mac:
- raise RuntimeError("Destination MAC address error: {} != {}".format(
- ether.dst, src_mac))
+ raise RuntimeError("Destination MAC address error: {} != {}".
+ format(ether.dst, src_mac))
print "Destination MAC address: OK."
- if ether['IPv6'].src != dst_ip:
- raise RuntimeError("Source IP address error: {} != {}".format(
- ether['IPv6'].src, dst_ip))
+ if ether[IPv6].src != dst_ip:
+ raise RuntimeError("Source IP address error: {} != {}".
+ format(ether[IPv6].src, dst_ip))
print "Source IP address: OK."
- if ether['IPv6'].dst != src_ip:
- raise RuntimeError("Destination IP address error: {} != {}".format(
- ether['IPv6'].dst, src_ip))
+ if ether[IPv6].dst != src_ip:
+ raise RuntimeError("Destination IP address error: {} != {}".
+ format(ether[IPv6].dst, src_ip))
print "Destination IP address: OK."
try:
- target_addr = ether['IPv6']\
- ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt
+ target_addr = ether[IPv6][ICMPv6ND_NA].tgt
except (KeyError, AttributeError):
raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
if target_addr != dst_ip:
- raise RuntimeError("ICMPv6 field 'Target address' error:"
- " {} != {}".format(target_addr, dst_ip))
+ raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}".
+ format(target_addr, dst_ip))
print "Target address field: OK."
@@ -119,16 +125,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
rxq = RxQueue(dst_if)
txq = TxQueue(src_if)
- icmpv6_ping_pkt = \
- Ether(src=src_mac, dst=proxy_to_src_mac) / \
- IPv6(src=src_ip, dst=dst_ip) / \
- ICMPv6EchoRequest()
+ icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) /
+ IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest())
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3)
+ while True:
+ pkt = rxq.recv(3)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
@@ -136,7 +148,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
if ether is None:
raise RuntimeError('ICMPv6 Echo Request timeout.')
try:
- ether["IPv6"]["ICMPv6 Echo Request"]
+ ether[IPv6]["ICMPv6 Echo Request"]
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
print "ICMP Echo: OK."
@@ -144,16 +156,22 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
rxq = RxQueue(src_if)
txq = TxQueue(dst_if)
- icmpv6_ping_pkt = \
- Ether(src=dst_mac, dst=proxy_to_dst_mac) / \
- IPv6(src=dst_ip, dst=src_ip) / \
- ICMPv6EchoReply()
+ icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) /
+ IPv6(src=dst_ip, dst=src_ip) /
+ ICMPv6EchoReply())
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3)
+ while True:
+ pkt = rxq.recv(3)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
@@ -161,7 +179,7 @@ def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
if ether is None:
raise RuntimeError('DHCPv6 SOLICIT timeout')
try:
- ether["IPv6"]["ICMPv6 Echo Reply"]
+ ether[IPv6]["ICMPv6 Echo Reply"]
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
@@ -187,8 +205,9 @@ def main():
imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
# Verify route (ICMP echo/reply)
- ipv6_ping(src_if, dst_if, src_mac, dst_mac,
- proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip)
+ ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac,
+ proxy_to_dst_mac, src_ip, dst_ip)
+
if __name__ == "__main__":
main()