aboutsummaryrefslogtreecommitdiffstats
path: root/resources/traffic_scripts/send_icmp_check_headers.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/traffic_scripts/send_icmp_check_headers.py')
-rwxr-xr-xresources/traffic_scripts/send_icmp_check_headers.py45
1 files changed, 26 insertions, 19 deletions
diff --git a/resources/traffic_scripts/send_icmp_check_headers.py b/resources/traffic_scripts/send_icmp_check_headers.py
index c9e3a35df4..2193164b61 100755
--- a/resources/traffic_scripts/send_icmp_check_headers.py
+++ b/resources/traffic_scripts/send_icmp_check_headers.py
@@ -22,8 +22,7 @@ import sys
import ipaddress
from robot.api import logger
from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from scapy.layers.l2 import Ether, Dot1Q
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
@@ -94,28 +93,36 @@ def main():
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- if tx_if == rx_if:
- ether = rxq.recv(2, ignore=sent_packets)
- else:
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
+ while True:
+ if tx_if == rx_if:
+ ether = rxq.recv(2, ignore=sent_packets)
+ else:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
logger.trace("MAC matched")
else:
- raise RuntimeError(
- "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+ raise RuntimeError("Matching packet unsuccessful: {0}".
+ format(ether.__repr__()))
if encaps_rx == 'Dot1q':
if ether[Dot1Q].vlan == int(vlan_rx):
logger.trace("VLAN matched")
else:
raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
- 'received, {}-expected):\n{}'
- .format(ether[Dot1Q].vlan, vlan_rx,
- ether.__repr__()))
+ 'received, {}-expected):\n{}'.
+ format(ether[Dot1Q].vlan, vlan_rx,
+ ether.__repr__()))
ip = ether[Dot1Q].payload
elif encaps_rx == 'Dot1ad':
raise NotImplementedError()
@@ -123,21 +130,21 @@ def main():
ip = ether.payload
if not isinstance(ip, ip_format):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError("Not an IP packet received {0}".
+ format(ip.__repr__()))
# Compare data from packets
if src_ip == ip.src:
logger.trace("Src IP matched")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
- .format(src_ip, ip.src))
+ raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
+ format(src_ip, ip.src))
if dst_ip == ip.dst:
logger.trace("Dst IP matched")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
- .format(dst_ip, ip.dst))
+ raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
+ format(dst_ip, ip.dst))
sys.exit(0)