aboutsummaryrefslogtreecommitdiffstats
path: root/resources/traffic_scripts
diff options
context:
space:
mode:
Diffstat (limited to 'resources/traffic_scripts')
-rwxr-xr-xresources/traffic_scripts/arp_request.py120
-rwxr-xr-xresources/traffic_scripts/check_ra_packet.py103
-rwxr-xr-xresources/traffic_scripts/dhcp/check_dhcp_discover.py150
-rwxr-xr-xresources/traffic_scripts/dhcp/check_dhcp_request.py218
-rwxr-xr-xresources/traffic_scripts/dhcp/check_dhcp_request_ack.py132
-rwxr-xr-xresources/traffic_scripts/dhcp/send_and_check_proxy_discover.py79
-rwxr-xr-xresources/traffic_scripts/dhcp/send_and_check_proxy_messages.py299
-rwxr-xr-xresources/traffic_scripts/dhcp/send_dhcp_v6_messages.py372
-rwxr-xr-xresources/traffic_scripts/icmpv6_echo.py108
-rwxr-xr-xresources/traffic_scripts/icmpv6_echo_req_resp.py190
-rwxr-xr-xresources/traffic_scripts/ipsec.py158
-rwxr-xr-xresources/traffic_scripts/ipv4_ping_ttl_check.py145
-rwxr-xr-xresources/traffic_scripts/ipv4_sweep_ping.py112
-rwxr-xr-xresources/traffic_scripts/ipv6_nd_proxy_check.py207
-rwxr-xr-xresources/traffic_scripts/ipv6_ns.py112
-rwxr-xr-xresources/traffic_scripts/ipv6_sweep_ping.py120
-rwxr-xr-xresources/traffic_scripts/lisp/lisp_check.py123
-rwxr-xr-xresources/traffic_scripts/lisp/lispgpe_check.py128
-rwxr-xr-xresources/traffic_scripts/policer.py72
-rwxr-xr-xresources/traffic_scripts/send_gre_check_gre_headers.py124
-rwxr-xr-xresources/traffic_scripts/send_gre_check_icmp_headers.py86
-rwxr-xr-xresources/traffic_scripts/send_icmp_check_arp.py135
-rwxr-xr-xresources/traffic_scripts/send_icmp_check_gre_headers.py92
-rwxr-xr-xresources/traffic_scripts/send_icmp_check_headers.py153
-rwxr-xr-xresources/traffic_scripts/send_icmp_check_multipath.py142
-rwxr-xr-xresources/traffic_scripts/send_icmp_type_code.py132
-rwxr-xr-xresources/traffic_scripts/send_icmp_wait_for_reply.py116
-rwxr-xr-xresources/traffic_scripts/send_ip_check_headers.py108
-rwxr-xr-xresources/traffic_scripts/send_ip_icmp.py186
-rwxr-xr-xresources/traffic_scripts/send_ipv4_icmp_check_lw_4o6.py139
-rwxr-xr-xresources/traffic_scripts/send_ipv4_udp_check_lw_4o6.py136
-rwxr-xr-xresources/traffic_scripts/send_ipv4_udp_check_map_t.py132
-rwxr-xr-xresources/traffic_scripts/send_ipv6_udp_check_map_t.py135
-rwxr-xr-xresources/traffic_scripts/send_lw_4o6_check_hairpinning_udp.py145
-rwxr-xr-xresources/traffic_scripts/send_lw_4o6_check_ipv4_udp.py126
-rwxr-xr-xresources/traffic_scripts/send_rs_check_ra.py127
-rwxr-xr-xresources/traffic_scripts/send_tcp_udp.py127
-rwxr-xr-xresources/traffic_scripts/send_vxlan_check_vxlan.py91
-rw-r--r--resources/traffic_scripts/vxlan.py16
39 files changed, 415 insertions, 4981 deletions
diff --git a/resources/traffic_scripts/arp_request.py b/resources/traffic_scripts/arp_request.py
deleted file mode 100755
index 8c5b9c7c47..0000000000
--- a/resources/traffic_scripts/arp_request.py
+++ /dev/null
@@ -1,120 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Send an ARP request and verify the reply"""
-
-import sys
-
-from scapy.all import Ether, ARP
-
-from resources.libraries.python.PacketVerifier import Interface
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def parse_arguments():
- """Parse arguments of the script passed through command line
-
- :return: tuple of parsed arguments
- """
- args = TrafficScriptArg(['src_if', 'src_mac', 'dst_mac',
- 'src_ip', 'dst_ip'])
-
- # check for mandatory parameters
- params = (args.get_arg('tx_if'),
- args.get_arg('src_mac'),
- args.get_arg('dst_mac'),
- args.get_arg('src_ip'),
- args.get_arg('dst_ip'))
- if None in params:
- raise Exception('Missing mandatory parameter(s)!')
-
- return params
-
-
-def arp_request_test():
- """Send ARP request, expect a reply and verify its fields.
-
- returns: test status
- :raises RuntimeError: ARP reply timeout.
- """
- test_passed = False
- (src_if, src_mac, dst_mac, src_ip, dst_ip) = parse_arguments()
-
- interface = Interface(src_if)
-
- # build an ARP request
- arp_request = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(psrc=src_ip, hwsrc=src_mac, pdst=dst_ip,
- hwdst='ff:ff:ff:ff:ff:ff'))
-
- # send the request
- interface.send_pkt(arp_request)
-
- try:
- # wait for APR reply
- ether = interface.recv_pkt()
-
- if not ether:
- raise RuntimeError("ARP reply timeout")
-
- # verify received packet
-
- if not ether.haslayer(ARP):
- raise RuntimeError('Unexpected packet: does not contain ARP ' +
- 'header "{}"'.format(ether.__repr__()))
-
- arp = ether['ARP']
- arp_reply = 2
-
- if arp.op != arp_reply:
- raise RuntimeError('expected op={}, received {}'.format(arp_reply,
- arp.op))
- if arp.ptype != 0x800:
- raise RuntimeError('expected ptype=0x800, received {}'.
- format(arp.ptype))
- if arp.hwlen != 6:
- raise RuntimeError('expected hwlen=6, received {}'.
- format(arp.hwlen))
- if arp.plen != 4:
- raise RuntimeError('expected plen=4, received {}'.format(arp.plen))
- if arp.hwsrc != dst_mac:
- raise RuntimeError('expected hwsrc={}, received {}'.
- format(dst_mac, arp.hwsrc))
- if arp.psrc != dst_ip:
- raise RuntimeError('expected psrc={}, received {}'.
- format(dst_ip, arp.psrc))
- if arp.hwdst != src_mac:
- raise RuntimeError('expected hwdst={}, received {}'.
- format(src_mac, arp.hwdst))
- if arp.pdst != src_ip:
- raise RuntimeError('expected pdst={}, received {}'.
- format(src_ip, arp.pdst))
- test_passed = True
-
- except RuntimeError as ex:
- print 'Error occurred: {}'.format(ex)
-
- return test_passed
-
-
-def main():
- """Run the test and collect result"""
- if arp_request_test():
- sys.exit(0)
- else:
- sys.exit(1)
-
-if __name__ == '__main__':
- main()
diff --git a/resources/traffic_scripts/check_ra_packet.py b/resources/traffic_scripts/check_ra_packet.py
deleted file mode 100755
index 9717c7db95..0000000000
--- a/resources/traffic_scripts/check_ra_packet.py
+++ /dev/null
@@ -1,103 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Router advertisement check script."""
-
-import sys
-import ipaddress
-
-from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_NS
-
-from resources.libraries.python.PacketVerifier import RxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def mac_to_ipv6_linklocal(mac):
- """Transfer MAC address into specific link-local IPv6 address.
-
- :param mac: MAC address to be transferred.
- :type mac: str
- :return: IPv6 link-local address.
- :rtype: str
- """
- # Remove the most common delimiters: dots, dashes, etc.
- mac_value = int(mac.translate(None, ' .:-'), 16)
-
- # Split out the bytes that slot into the IPv6 address
- # XOR the most significant byte with 0x02, inverting the
- # Universal / Local bit
- high2 = mac_value >> 32 & 0xffff ^ 0x0200
- high1 = mac_value >> 24 & 0xff
- low1 = mac_value >> 16 & 0xff
- low2 = mac_value & 0xffff
-
- return 'fe80::{:04x}:{:02x}ff:fe{:02x}:{:04x}'.format(
- high2, high1, low1, low2)
-
-
-def main():
- """Check packets on specific port and look for the Router Advertisement
- part.
- """
-
- args = TrafficScriptArg(['src_mac', 'interval'])
-
- rx_if = args.get_arg('rx_if')
- src_mac = args.get_arg('src_mac')
- interval = int(args.get_arg('interval'))
- rxq = RxQueue(rx_if)
-
- # receive ICMPv6ND_RA packet
- while True:
- ether = rxq.recv(max(5, interval))
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- # Check if received packet contains layer RA and check other values
- if not ether.haslayer(ICMPv6ND_RA):
- raise RuntimeError('Not an RA packet received {0}'.
- format(ether.__repr__()))
-
- src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
- dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
- link_local = ipaddress.IPv6Address(unicode(mac_to_ipv6_linklocal(src_mac)))
- all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1')
-
- if src_address != link_local:
- raise RuntimeError('Source address ({0}) not matching link local '
- 'address ({1})'.format(src_address, link_local))
- if dst_address != all_nodes_multicast:
- raise RuntimeError('Packet destination address ({0}) is not the all'
- ' nodes multicast address ({1}).'.
- format(dst_address, all_nodes_multicast))
- if ether[IPv6].hlim != 255:
- raise RuntimeError('Hop limit not correct: {0}!=255'.
- format(ether[IPv6].hlim))
-
- ra_code = ether[ICMPv6ND_RA].code
- if ra_code != 0:
- raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/dhcp/check_dhcp_discover.py b/resources/traffic_scripts/dhcp/check_dhcp_discover.py
deleted file mode 100755
index 2fdc5b7fbf..0000000000
--- a/resources/traffic_scripts/dhcp/check_dhcp_discover.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that receives an DHCP packet on given interface and check if
-is correct DHCP DISCOVER message.
-"""
-
-import sys
-
-from scapy.layers.inet import UDP_SERVICES
-
-from resources.libraries.python.PacketVerifier import RxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def main():
- args = TrafficScriptArg(['rx_src_mac'], ['hostname'])
-
- rx_if = args.get_arg('rx_if')
- rx_src_mac = args.get_arg('rx_src_mac')
- hostname = args.get_arg('hostname')
-
- rx_dst_mac = 'ff:ff:ff:ff:ff:ff'
- rx_src_ip = '0.0.0.0'
- rx_dst_ip = '255.255.255.255'
- boot_request = 1
- dhcp_magic = 'c\x82Sc'
-
- rxq = RxQueue(rx_if)
-
- ether = rxq.recv(10)
-
- if ether is None:
- raise RuntimeError("DHCP DISCOVER Rx timeout.")
-
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Destination MAC address error.")
- print "Destination MAC address: OK."
-
- if ether.src != rx_src_mac:
- raise RuntimeError("Source MAC address error.")
- print "Source MAC address: OK."
-
- if ether['IP'].dst != rx_dst_ip:
- raise RuntimeError("Destination IP address error.")
- print "Destination IP address: OK."
-
- if ether['IP'].src != rx_src_ip:
- raise RuntimeError("Source IP address error.")
- print "Source IP address: OK."
-
- if ether['IP']['UDP'].dport != UDP_SERVICES.bootps:
- raise RuntimeError("UDP destination port error.")
- print "UDP destination port: OK."
-
- if ether['IP']['UDP'].sport != UDP_SERVICES.bootpc:
- raise RuntimeError("UDP source port error.")
- print "UDP source port: OK."
-
- bootp = ether['BOOTP']
-
- if bootp.op != boot_request:
- raise RuntimeError("BOOTP message type error.")
- print "BOOTP message type: OK"
-
- if bootp.ciaddr != '0.0.0.0':
- raise RuntimeError("BOOTP client IP address error.")
- print "BOOTP client IP address: OK"
-
- if bootp.yiaddr != '0.0.0.0':
- raise RuntimeError("BOOTP 'your' (client) IP address error.")
- print "BOOTP 'your' (client) IP address: OK"
-
- if bootp.siaddr != '0.0.0.0':
- raise RuntimeError("BOOTP next server IP address error.")
- print "BOOTP next server IP address: OK"
-
- if bootp.giaddr != '0.0.0.0':
- raise RuntimeError("BOOTP relay agent IP address error.")
- print "BOOTP relay agent IP address: OK"
-
- chaddr = bootp.chaddr[:bootp.hlen].encode('hex')
- if chaddr != rx_src_mac.replace(':', ''):
- raise RuntimeError("BOOTP client hardware address error.")
- print "BOOTP client hardware address: OK"
-
- # Check hostname
- if bootp.sname != 64*'\x00':
- raise RuntimeError("BOOTP server name error.")
- print "BOOTP server name: OK"
-
- # Check boot file
- if bootp.file != 128*'\x00':
- raise RuntimeError("BOOTP boot file name error.")
- print "BOOTP boot file name: OK"
-
- # Check bootp magic
- if bootp.options != dhcp_magic:
- raise RuntimeError("DHCP magic error.")
- print "DHCP magic: OK"
-
- # Check options
- dhcp_options = ether['DHCP options'].options
-
- # Option 12
- hn = filter(lambda x: x[0] == 'hostname', dhcp_options)
- if hostname:
- try:
- if hn[0][1] != hostname:
- raise RuntimeError("Client's hostname doesn't match.")
- except IndexError:
- raise RuntimeError("Option list doesn't contain hostname option.")
- else:
- if len(hn) != 0:
- raise RuntimeError("Option list contains hostname option.")
- print "Option 12 hostname: OK"
-
- # Option 53
- mt = filter(lambda x: x[0] == 'message-type', dhcp_options)[0][1]
- if mt != 1:
- raise RuntimeError("Option 53 message-type error.")
- print "Option 53 message-type: OK"
-
- # Option 55
- prl = filter(lambda x: x[0] == 'param_req_list', dhcp_options)[0][1]
- if prl != '\x01\x1c\x02\x03\x0f\x06w\x0c,/\x1ay*':
- raise RuntimeError("Option 55 param_req_list error.")
- print "Option 55 param_req_list: OK"
-
- # Option 255
- if 'end' not in dhcp_options:
- raise RuntimeError("end option error.")
- print "end option: OK"
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/dhcp/check_dhcp_request.py b/resources/traffic_scripts/dhcp/check_dhcp_request.py
deleted file mode 100755
index 522f2f507c..0000000000
--- a/resources/traffic_scripts/dhcp/check_dhcp_request.py
+++ /dev/null
@@ -1,218 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an DHCP OFFER message and checks if the DHCP
-REQUEST contains all required fields."""
-
-import sys
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP, UDP_SERVICES
-from scapy.layers.dhcp import BOOTP, DHCP
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def is_discover(pkt):
- """If DHCP message type option is set to dhcp discover return True,
- else return False. False is returned also if exception occurs."""
- dhcp_discover = 1
- try:
- dhcp_options = pkt['BOOTP']['DHCP options'].options
- message_type = filter(lambda x: x[0] == 'message-type',
- dhcp_options)
- message_type = message_type[0][1]
- return message_type == dhcp_discover
- except:
- return False
-
-
-def is_request(pkt):
- """If DHCP message type option is DHCP REQUEST return True,
- else return False. False is returned also if exception occurs."""
- dhcp_request = 3
- try:
- dhcp_options = pkt['BOOTP']['DHCP options'].options
- message_type = filter(lambda x: x[0] == 'message-type',
- dhcp_options)
- message_type = message_type[0][1]
- return message_type == dhcp_request
- except:
- return False
-
-
-def main():
- """Main function of the script file."""
- args = TrafficScriptArg(['client_mac', 'server_mac', 'server_ip',
- 'client_ip', 'client_mask'],
- ['hostname', 'offer_xid'])
-
- server_if = args.get_arg('rx_if')
- server_mac = args.get_arg('server_mac')
- server_ip = args.get_arg('server_ip')
-
- client_mac = args.get_arg('client_mac')
- client_ip = args.get_arg('client_ip')
- client_mask = args.get_arg('client_mask')
-
- hostname = args.get_arg('hostname')
- offer_xid = args.get_arg('offer_xid')
-
- rx_src_ip = '0.0.0.0'
- rx_dst_ip = '255.255.255.255'
-
- rxq = RxQueue(server_if)
- txq = TxQueue(server_if)
- sent_packets = []
-
- for _ in range(10):
- dhcp_discover = rxq.recv(10)
- if is_discover(dhcp_discover):
- break
- else:
- raise RuntimeError("DHCP DISCOVER Rx error.")
-
- dhcp_offer = Ether(src=server_mac, dst=dhcp_discover.src)
- dhcp_offer /= IP(src=server_ip, dst="255.255.255.255")
- dhcp_offer /= UDP(sport=67, dport=68)
- dhcp_offer /= BOOTP(op=2,
- # if offer_xid differs from xid value in DHCP DISCOVER
- # the DHCP OFFER has to be discarded
- xid=int(offer_xid) if offer_xid
- else dhcp_discover['BOOTP'].xid,
- yiaddr=client_ip,
- siaddr=server_ip,
- chaddr=dhcp_discover['BOOTP'].chaddr)
- dhcp_offer_options = [("message-type", "offer"), # Option 53
- ("subnet_mask", client_mask), # Option 1
- ("server_id", server_ip), # Option 54, dhcp server
- ("lease_time", 43200), # Option 51
- "end"]
- dhcp_offer /= DHCP(options=dhcp_offer_options)
-
- txq.send(dhcp_offer)
- sent_packets.append(dhcp_offer)
-
- max_other_pkts = 10
- for _ in range(0, max_other_pkts):
- dhcp_request = rxq.recv(5, sent_packets)
- if not dhcp_request:
- raise RuntimeError("DHCP REQUEST Rx timeout.")
- if is_request(dhcp_request):
- break
- else:
- raise RuntimeError("Max RX packet limit reached.")
-
- if offer_xid:
- # if offer_xid differs from xid value in DHCP DISCOVER the DHCP OFFER
- # has to be discarded
- raise RuntimeError("DHCP REQUEST received. DHCP OFFER with wrong XID "
- "has not been discarded.")
-
- # CHECK ETHER, IP, UDP
- if dhcp_request.dst != dhcp_discover.dst:
- raise RuntimeError("Destination MAC error.")
- print "Destination MAC: OK."
-
- if dhcp_request.src != dhcp_discover.src:
- raise RuntimeError("Source MAC error.")
- print "Source MAC: OK."
-
- if dhcp_request['IP'].dst != rx_dst_ip:
- raise RuntimeError("Destination IP error.")
- print "Destination IP: OK."
-
- if dhcp_request['IP'].src != rx_src_ip:
- raise RuntimeError("Source IP error.")
- print "Source IP: OK."
-
- if dhcp_request['IP']['UDP'].dport != UDP_SERVICES.bootps:
- raise RuntimeError("BOOTPs error.")
- print "BOOTPs: OK."
-
- if dhcp_request['IP']['UDP'].sport != UDP_SERVICES.bootpc:
- raise RuntimeError("BOOTPc error.")
- print "BOOTPc: OK."
-
- # CHECK BOOTP
- if dhcp_request['BOOTP'].op != dhcp_discover['BOOTP'].op:
- raise RuntimeError("BOOTP operation error.")
- print "BOOTP operation: OK"
-
- if dhcp_request['BOOTP'].xid != dhcp_discover['BOOTP'].xid:
- raise RuntimeError("BOOTP XID error.")
- print "BOOTP XID: OK"
-
- if dhcp_request['BOOTP'].ciaddr != '0.0.0.0':
- raise RuntimeError("BOOTP ciaddr error.")
- print "BOOTP ciaddr: OK"
-
- ca = dhcp_request['BOOTP'].chaddr[:dhcp_request['BOOTP'].hlen].encode('hex')
- if ca != client_mac.replace(':', ''):
- raise RuntimeError("BOOTP client hardware address error.")
- print "BOOTP client hardware address: OK"
-
- if dhcp_request['BOOTP'].options != dhcp_discover['BOOTP'].options:
- raise RuntimeError("DHCP options error.")
- print "DHCP options: OK"
-
- # CHECK DHCP OPTIONS
- dhcp_options = dhcp_request['DHCP options'].options
-
- hn = filter(lambda x: x[0] == 'hostname', dhcp_options)
- if hostname:
- try:
- if hn[0][1] != hostname:
- raise RuntimeError("Client's hostname doesn't match.")
- except IndexError:
- raise RuntimeError("Option list doesn't contain hostname option.")
- else:
- if len(hn) != 0:
- raise RuntimeError("Option list contains hostname option.")
- print "Option 12 hostname: OK"
-
- # Option 50
- ra = filter(lambda x: x[0] == 'requested_addr', dhcp_options)[0][1]
- if ra != client_ip:
- raise RuntimeError("Option 50 requested_addr error.")
- print "Option 50 requested_addr: OK"
-
- # Option 53
- mt = filter(lambda x: x[0] == 'message-type', dhcp_options)[0][1]
- if mt != 3: # request
- raise RuntimeError("Option 53 message-type error.")
- print "Option 53 message-type: OK"
-
- # Option 54
- sid = filter(lambda x: x[0] == 'server_id', dhcp_options)[0][1]
- if sid != server_ip:
- raise RuntimeError("Option 54 server_id error.")
- print "Option 54 server_id: OK"
-
- # Option 55
- prl = filter(lambda x: x[0] == 'param_req_list', dhcp_options)[0][1]
- if prl != '\x01\x1c\x02\x03\x0f\x06w\x0c,/\x1ay*':
- raise RuntimeError("Option 55 param_req_list error.")
- print "Option 55 param_req_list: OK"
-
- # Option 255
- if 'end' not in dhcp_options:
- raise RuntimeError("end option error.")
- print "end option: OK"
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/dhcp/check_dhcp_request_ack.py b/resources/traffic_scripts/dhcp/check_dhcp_request_ack.py
deleted file mode 100755
index 8a3839cda7..0000000000
--- a/resources/traffic_scripts/dhcp/check_dhcp_request_ack.py
+++ /dev/null
@@ -1,132 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends a DHCP ACK message when DHCP REQUEST message is
-received."""
-
-import sys
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
-from scapy.layers.dhcp import BOOTP, DHCP
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def is_discover(pkt):
- """If DHCP message type option is set to dhcp discover return True,
- else return False. False is returned also if exception occurs."""
- dhcp_discover = 1
- try:
- dhcp_options = pkt['BOOTP']['DHCP options'].options
- message_type = filter(lambda x: x[0] == 'message-type',
- dhcp_options)
- message_type = message_type[0][1]
- return message_type == dhcp_discover
- except:
- return False
-
-
-def is_request(pkt):
- """If DHCP message type option is DHCP REQUEST return True,
- else return False. False is returned also if exception occurs."""
- dhcp_request = 3
- try:
- dhcp_options = pkt['BOOTP']['DHCP options'].options
- message_type = filter(lambda x: x[0] == 'message-type',
- dhcp_options)
- message_type = message_type[0][1]
- return message_type == dhcp_request
- except:
- return False
-
-
-def main():
- """Main function of the script file."""
- args = TrafficScriptArg(['server_mac', 'server_ip', 'client_ip',
- 'client_mask', 'lease_time'])
-
- server_if = args.get_arg('rx_if')
- server_mac = args.get_arg('server_mac')
- server_ip = args.get_arg('server_ip')
-
- client_ip = args.get_arg('client_ip')
- client_mask = args.get_arg('client_mask')
-
- lease_time = int(args.get_arg('lease_time'))
-
- rxq = RxQueue(server_if)
- txq = TxQueue(server_if)
- sent_packets = []
-
- for _ in range(10):
- dhcp_discover = rxq.recv(10)
- if is_discover(dhcp_discover):
- break
- else:
- raise RuntimeError("DHCP DISCOVER Rx error.")
-
- dhcp_offer = Ether(src=server_mac, dst=dhcp_discover.src)
- dhcp_offer /= IP(src=server_ip, dst="255.255.255.255")
- dhcp_offer /= UDP(sport=67, dport=68)
- dhcp_offer /= BOOTP(op=2,
- xid=dhcp_discover['BOOTP'].xid,
- yiaddr=client_ip,
- siaddr=server_ip,
- chaddr=dhcp_discover['BOOTP'].chaddr)
- dhcp_offer_options = [("message-type", "offer"), # Option 53
- ("subnet_mask", client_mask), # Option 1
- ("server_id", server_ip), # Option 54, dhcp server
- ("lease_time", lease_time), # Option 51
- "end"]
- dhcp_offer /= DHCP(options=dhcp_offer_options)
-
- txq.send(dhcp_offer)
- sent_packets.append(dhcp_offer)
-
- max_other_pkts = 10
- for _ in range(0, max_other_pkts):
- dhcp_request = rxq.recv(5, sent_packets)
- if not dhcp_request:
- raise RuntimeError("DHCP REQUEST Rx timeout.")
- if is_request(dhcp_request):
- break
- else:
- raise RuntimeError("Max RX packet limit reached.")
-
- # Send dhcp ack
- dhcp_ack = Ether(src=server_mac, dst=dhcp_request.src)
- dhcp_ack /= IP(src=server_ip, dst="255.255.255.255")
- dhcp_ack /= UDP(sport=67, dport=68)
- dhcp_ack /= BOOTP(op=2,
- xid=dhcp_request['BOOTP'].xid,
- yiaddr=client_ip,
- siaddr=server_ip,
- flags=dhcp_request['BOOTP'].flags,
- chaddr=dhcp_request['BOOTP'].chaddr)
- dhcp_ack_options = [("message-type", "ack"), # Option 53. 5: ACK, 6: NAK
- ("subnet_mask", client_mask), # Option 1
- ("server_id", server_ip), # Option 54, dhcp server
- ("lease_time", lease_time), # Option 51,
- "end"]
- dhcp_ack /= DHCP(options=dhcp_ack_options)
-
- txq.send(dhcp_ack)
- sent_packets.append(dhcp_ack)
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/dhcp/send_and_check_proxy_discover.py b/resources/traffic_scripts/dhcp/send_and_check_proxy_discover.py
deleted file mode 100755
index d8089713fd..0000000000
--- a/resources/traffic_scripts/dhcp/send_and_check_proxy_discover.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends DHCP DISCOVER packet
- and check if is received on interface."""
-
-import sys
-
-from scapy.all import Ether
-from scapy.layers.inet import IP, UDP
-from scapy.layers.inet import UDP_SERVICES
-from scapy.layers.dhcp import DHCP, BOOTP
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def is_discover(pkt):
- """If DHCP message type option is set to dhcp discover return True,
- else return False. False is returned also if exception occurs."""
- dhcp_discover = 1
- try:
- dhcp_options = pkt['BOOTP']['DHCP options'].options
- message_type = filter(lambda x: x[0] == 'message-type',
- dhcp_options)
- message_type = message_type[0][1]
- return message_type == dhcp_discover
- except:
- return False
-
-
-def main():
- """Send DHCP DISCOVER packet."""
-
- args = TrafficScriptArg(['tx_src_ip', 'tx_dst_ip'])
-
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- tx_src_ip = args.get_arg('tx_src_ip')
- tx_dst_ip = args.get_arg('tx_dst_ip')
-
- sent_packets = []
-
- dhcp_discover = Ether(dst="ff:ff:ff:ff:ff:ff") / \
- IP(src=tx_src_ip, dst=tx_dst_ip) / \
- UDP(sport=UDP_SERVICES.bootpc, dport=UDP_SERVICES.bootps) / \
- BOOTP(op=1,) / \
- DHCP(options=[("message-type", "discover"),
- "end"])
-
- sent_packets.append(dhcp_discover)
- txq.send(dhcp_discover)
-
- for _ in range(10):
- dhcp_discover = rxq.recv(2)
- if is_discover(dhcp_discover):
- break
- else:
- raise RuntimeError("DHCP DISCOVER Rx timeout")
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/dhcp/send_and_check_proxy_messages.py b/resources/traffic_scripts/dhcp/send_and_check_proxy_messages.py
deleted file mode 100755
index 27f148c900..0000000000
--- a/resources/traffic_scripts/dhcp/send_and_check_proxy_messages.py
+++ /dev/null
@@ -1,299 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends DHCP packets."""
-
-import sys
-
-from scapy.all import Ether
-from scapy.layers.inet import IP, UDP
-from scapy.layers.inet import UDP_SERVICES
-from scapy.layers.dhcp import DHCP, BOOTP
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def dhcp_discover(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, proxy_ip,
- client_mac):
- """Send and check DHCP DISCOVER proxy packet."""
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
-
- dhcp_discover = Ether(src=client_mac, dst="ff:ff:ff:ff:ff:ff") / \
- IP(src=tx_src_ip, dst=tx_dst_ip) / \
- UDP(sport=UDP_SERVICES.bootpc, dport=UDP_SERVICES.bootps) / \
- BOOTP(op=1,) / \
- DHCP(options=[("message-type", "discover"),
- "end"])
-
- sent_packets.append(dhcp_discover)
- txq.send(dhcp_discover)
-
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError('DHCP DISCOVER timeout')
-
- if ether[IP].src != proxy_ip:
- raise RuntimeError("Source IP address error.")
- print "Source IP address: OK."
-
- if ether[IP].dst != server_ip:
- raise RuntimeError("Destination IP address error.")
- print "Destination IP address: OK."
-
- if ether[UDP].dport != UDP_SERVICES.bootps:
- raise RuntimeError("UDP destination port error.")
- print "UDP destination port: OK."
-
- if ether[UDP].sport != UDP_SERVICES.bootpc:
- raise RuntimeError("UDP source port error.")
- print "UDP source port: OK."
-
- if ether[DHCP].options[1][0] != 'relay_agent_Information': # option 82
- raise RuntimeError("Relay agent information error.")
- option_82 = ether[DHCP].options[1][1]
-
- if ether[DHCP].options[0][1] != 1: # 1 - DISCOVER message
- raise RuntimeError("DHCP DISCOVER message error.")
- print "DHCP DISCOVER message OK."
-
- return option_82
-
-
-def dhcp_offer(rx_if, tx_if, tx_dst_ip, server_ip, proxy_ip, client_ip,
- server_mac, option_82):
- """Send and check DHCP OFFER proxy packet."""
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
-
- dhcp_offer = Ether(src=server_mac, dst="ff:ff:ff:ff:ff:ff") / \
- IP(src=server_ip, dst=tx_dst_ip) / \
- UDP(sport=UDP_SERVICES.bootps, dport=UDP_SERVICES.bootpc) / \
- BOOTP(op=2,
- yiaddr=client_ip,
- siaddr=server_ip) / \
- DHCP(options=
- [("message-type", "offer"),
- ("server_id", server_ip),
- ("relay_agent_Information", option_82),
- "end"])
-
- txq.send(dhcp_offer)
- sent_packets.append(dhcp_offer)
-
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError('DHCP OFFER timeout')
-
- if ether[IP].dst != tx_dst_ip:
- raise RuntimeError("Destination IP address error.")
- print "Destination IP address: OK."
-
- if ether[IP].src != proxy_ip:
- raise RuntimeError("Source IP address error.")
- print "Source IP address: OK."
-
- if ether[UDP].dport != UDP_SERVICES.bootpc:
- raise RuntimeError("UDP destination port error.")
- print "UDP destination port: OK."
-
- if ether[UDP].sport != UDP_SERVICES.bootps:
- raise RuntimeError("UDP source port error.")
- print "UDP source port: OK."
-
- if ether[BOOTP].yiaddr != client_ip:
- raise RuntimeError("Client IP address error.")
- print "Client IP address: OK."
-
- if ether[BOOTP].siaddr != server_ip:
- raise RuntimeError("DHCP server IP address error.")
- print "DHCP server IP address: OK."
-
- if ether[DHCP].options[0][1] != 2: # 2 - OFFER message
- raise RuntimeError("DHCP OFFER message error.")
- print "DHCP OFFER message OK."
-
-
-def dhcp_request(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, proxy_ip,
- client_ip, client_mac):
- """Send and check DHCP REQUEST proxy packet."""
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
-
- dhcp_request = Ether(src=client_mac, dst="ff:ff:ff:ff:ff:ff") / \
- IP(src=tx_src_ip, dst=tx_dst_ip) / \
- UDP(sport=UDP_SERVICES.bootpc, dport=UDP_SERVICES.bootps) / \
- BOOTP(op=1,
- giaddr=proxy_ip,
- siaddr=server_ip) / \
- DHCP(options=[("message-type", "request"),
- ("server_id", server_ip),
- ("requested_addr", client_ip),
- "end"])
-
- sent_packets.append(dhcp_request)
- txq.send(dhcp_request)
-
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError('DHCP REQUEST timeout')
-
- if ether[IP].dst != server_ip:
- raise RuntimeError("Destination IP address error.")
- print "Destination IP address: OK."
-
- if ether[IP].src != proxy_ip:
- raise RuntimeError("Source IP address error.")
- print "Source IP address: OK."
-
- if ether[UDP].dport != UDP_SERVICES.bootps:
- raise RuntimeError("UDP destination port error.")
- print "UDP destination port: OK."
-
- if ether[UDP].sport != UDP_SERVICES.bootpc:
- raise RuntimeError("UDP source port error.")
- print "UDP source port: OK."
-
- if ether[BOOTP].siaddr != server_ip:
- raise RuntimeError("DHCP server IP address error.")
- print "DHCP server IP address: OK."
-
- if ether[DHCP].options[2][1] != client_ip:
- raise RuntimeError("Requested IP address error.")
- print "Requested IP address: OK."
-
- if ether[DHCP].options[3][0] != 'relay_agent_Information': # option 82
- raise RuntimeError("Relay agent information error.")
-
- if ether[DHCP].options[0][1] != 3: # 2 - REQUEST message
- raise RuntimeError("DHCP REQUEST message error.")
- print "DHCP REQUEST message: OK."
-
-
-def dhcp_ack(rx_if, tx_if, tx_dst_ip, server_ip, proxy_ip, client_ip,
- server_mac, option_82):
- """Send and check DHCP ACK proxy packet."""
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- lease_time = 43200 # 12 hours
-
- sent_packets = []
-
- dhcp_ack = Ether(src=server_mac, dst="ff:ff:ff:ff:ff:ff") / \
- IP(src=server_ip, dst=tx_dst_ip) / \
- UDP(sport=UDP_SERVICES.bootps, dport=UDP_SERVICES.bootpc) / \
- BOOTP(op=2,
- yiaddr=client_ip,
- siaddr=server_ip) / \
- DHCP(options=
- [("message-type", "ack"),
- ("server_id", server_ip),
- ("lease_time", lease_time),
- ("relay_agent_Information", option_82),
- "end"])
-
- txq.send(dhcp_ack)
- sent_packets.append(dhcp_ack)
-
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError('DHCP ACK timeout')
-
- if ether[IP].dst != tx_dst_ip:
- raise RuntimeError("Destination IP address error.")
- print "Destination IP address: OK."
-
- if ether[IP].src != proxy_ip:
- raise RuntimeError("Source IP address error.")
- print "Source IP address: OK."
-
- if ether[UDP].dport != UDP_SERVICES.bootpc:
- raise RuntimeError("UDP destination port error.")
- print "UDP destination port: OK."
-
- if ether[UDP].sport != UDP_SERVICES.bootps:
- raise RuntimeError("UDP source port error.")
- print "UDP source port: OK."
-
- if ether[BOOTP].yiaddr != client_ip:
- raise RuntimeError("Client IP address error.")
- print "Client IP address: OK."
-
- if ether[BOOTP].siaddr != server_ip:
- raise RuntimeError("DHCP server IP address error.")
- print "DHCP server IP address: OK."
-
- if ether[DHCP].options[2][1] != lease_time:
- raise RuntimeError("DHCP lease time error.")
- print "DHCP lease time OK."
-
- if ether[DHCP].options[0][1] != 5: # 5 - ACK message
- raise RuntimeError("DHCP ACK message error.")
- print "DHCP ACK message OK."
-
-
-def main():
- """Send DHCP proxy messages."""
-
- args = TrafficScriptArg(['server_ip', 'server_mac', 'client_ip',
- 'client_mac', 'proxy_ip'])
-
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
-
- tx_src_ip = "0.0.0.0"
- tx_dst_ip = "255.255.255.255"
-
- server_ip = args.get_arg('server_ip')
- client_ip = args.get_arg('client_ip')
- proxy_ip = args.get_arg('proxy_ip')
- client_mac = args.get_arg('client_mac')
- server_mac = args.get_arg('server_mac')
-
- # DHCP DISCOVER
- option_82 = dhcp_discover(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip,
- proxy_ip, client_mac)
-
- # DHCP OFFER
- dhcp_offer(tx_if, rx_if, tx_dst_ip, server_ip, proxy_ip, client_ip,
- server_mac, option_82)
-
- # DHCP REQUEST
- dhcp_request(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, proxy_ip,
- client_ip, client_mac)
-
- # DHCP ACK
- dhcp_ack(tx_if, rx_if, tx_dst_ip, server_ip, proxy_ip, client_ip,
- server_mac, option_82)
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py b/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py
deleted file mode 100755
index 09742565ee..0000000000
--- a/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py
+++ /dev/null
@@ -1,372 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends DHCPv6 proxy packets."""
-
-from scapy.layers.dhcp6 import *
-from scapy.layers.inet6 import IPv6, UDP, UDP_SERVICES
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def _check_udp_checksum(pkt):
- """Check udp checksum in ip packet.
- Return true if checksum is correct."""
- new = pkt.__class__(str(pkt))
- del new['UDP'].chksum
- new = new.__class__(str(new))
- return new['UDP'].chksum == pkt['UDP'].chksum
-
-
-def _get_dhcpv6_msgtype(msg_index):
- """Return DHCPv6 message type string.
-
- :param msg_index: Index of message type.
- :return: Message type.
- :type msg_index: int
- :rtype msg_str: str
- """
- dhcp6_messages = {
- 1: "SOLICIT",
- 2: "ADVERTISE",
- 3: "REQUEST",
- 4: "CONFIRM",
- 5: "RENEW",
- 6: "REBIND",
- 7: "REPLY",
- 8: "RELEASE",
- 9: "DECLINE",
- 10: "RECONFIGURE",
- 11: "INFORMATION-REQUEST",
- 12: "RELAY-FORW",
- 13: "RELAY-REPL"
- }
- return dhcp6_messages[msg_index]
-
-
-def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
- server_ip, server_mac, client_duid, client_mac):
- """Send and check DHCPv6 SOLICIT proxy packet.
-
- :param tx_if: Client interface.
- :param rx_if: DHCPv6 server interface.
- :param dhcp_multicast_ip: Servers and relay agents multicast address.
- :param link_local_ip: Client link-local address.
- :param proxy_ip: IP address of DHCPv6 proxy server.
- :param server_ip: IP address of DHCPv6 server.
- :param server_mac: MAC address of DHCPv6 server.
- :param client_duid: Client DHCP Unique Identifier.
- :param client_mac: Client MAC address.
- :type tx_if: str
- :type rx_if: str
- :type dhcp_multicast_ip: str
- :type link_local_ip: str
- :type proxy_ip: str
- :type server_ip: str
- :type server_mac: str
- :type client_duid: str
- :type client_mac: str
- :return interface_id: ID of proxy interface.
- :rtype interface_id: str
- """
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
-
- dhcp6_solicit_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \
- IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \
- UDP(sport=UDP_SERVICES.dhcpv6_client,
- dport=UDP_SERVICES.dhcpv6_server) / \
- DHCP6_Solicit() / \
- DHCP6OptClientId(duid=client_duid)
-
- sent_packets.append(dhcp6_solicit_pkt)
- txq.send(dhcp6_solicit_pkt)
-
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError('DHCPv6 SOLICIT timeout')
-
- if ether.dst != server_mac:
- raise RuntimeError("Destination MAC address error: {} != {}".format(
- ether.dst, server_mac))
- print "Destination MAC address: OK."
-
- if ether['IPv6'].src != proxy_ip:
- raise RuntimeError("Source IP address error: {} != {}".format(
- ether['IPv6'].src, proxy_ip))
- print "Source IP address: OK."
-
- if ether['IPv6'].dst != server_ip:
- raise RuntimeError("Destination IP address error: {} != {}".format(
- ether['IPv6'].dst, server_ip))
- print "Destination IP address: OK."
-
- msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
- ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype)
- if msgtype != 'RELAY-FORW':
- raise RuntimeError("Message type error: {} != RELAY-FORW".format(
- msgtype))
- print "Message type: OK."
-
- linkaddr = ether['IPv6']['UDP']\
- ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
- if linkaddr != proxy_ip:
- raise RuntimeError("Proxy IP address error: {} != {}".format(
- linkaddr, proxy_ip))
- print "Proxy IP address: OK."
-
- try:
- interface_id = ether['IPv6']['UDP']\
- ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\
- ['Unknown DHCPv6 OPtion']['DHCP6 Interface-Id Option'].ifaceid
- except Exception:
- raise RuntimeError("DHCP6 Interface-Id error!")
-
- return interface_id
-
-
-def dhcpv6_advertise(rx_if, tx_if, link_local_ip, proxy_ip,
- server_ip, server_mac, proxy_to_server_mac, interface_id):
- """Send and check DHCPv6 ADVERTISE proxy packet.
-
- :param rx_if: DHCPv6 server interface.
- :param tx_if: Client interface.
- :param link_local_ip: Client link-local address.
- :param proxy_ip: IP address of DHCPv6 proxy server.
- :param server_ip: IP address of DHCPv6 server.
- :param server_mac: MAC address of DHCPv6 server.
- :param proxy_to_server_mac: MAC address of DHCPv6 proxy interface.
- :param interface_id: ID of proxy interface.
- :type rx_if: str
- :type tx_if: str
- :type link_local_ip: str
- :type proxy_ip: str
- :type server_ip: str
- :type server_mac: str
- :type proxy_to_server_mac: str
- :type interface_id: str
- """
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
-
- dhcp6_advertise_pkt = Ether(src=server_mac, dst=proxy_to_server_mac) / \
- IPv6(src=server_ip, dst=proxy_ip) / \
- UDP(sport=UDP_SERVICES.dhcpv6_server,
- dport=UDP_SERVICES.dhcpv6_client) / \
- DHCP6_RelayReply(peeraddr=link_local_ip,
- linkaddr=proxy_ip) / \
- DHCP6OptIfaceId(ifaceid=interface_id) / \
- DHCP6OptRelayMsg() / \
- DHCP6_Advertise()
-
- sent_packets.append(dhcp6_advertise_pkt)
- txq.send(dhcp6_advertise_pkt)
-
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError('DHCPv6 ADVERTISE timeout')
-
- if ether['IPv6'].src != proxy_ip:
- raise RuntimeError("Source IP address error: {} != {}".format(
- ether['IPv6'].src, proxy_ip))
- print "Source IP address: OK."
-
- if not _check_udp_checksum(ether['IPv6']):
- raise RuntimeError("Checksum error!")
- print "Checksum: OK."
-
- msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
- ['DHCPv6 Advertise Message'].msgtype)
- if msgtype != 'ADVERTISE':
- raise RuntimeError("Message type error: {} != ADVERTISE".format(
- msgtype))
- print "Message type: OK."
-
-
-def dhcpv6_request(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
- server_ip, client_duid, client_mac):
- """Send and check DHCPv6 REQUEST proxy packet.
-
- :param tx_if: Client interface.
- :param rx_if: DHCPv6 server interface.
- :param dhcp_multicast_ip: Servers and relay agents multicast address.
- :param link_local_ip: Client link-local address.
- :param proxy_ip: IP address of DHCPv6 proxy server.
- :param server_ip: IP address of DHCPv6 server.
- :param client_duid: Client DHCP Unique Identifier.
- :param client_mac: Client MAC address.
- :type tx_if: str
- :type rx_if: str
- :type dhcp_multicast_ip: str
- :type link_local_ip: str
- :type proxy_ip: str
- :type server_ip: str
- :type client_duid: str
- :type client_mac: str
- """
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
-
- dhcp6_request_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \
- IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \
- UDP(sport=UDP_SERVICES.dhcpv6_client,
- dport=UDP_SERVICES.dhcpv6_server) / \
- DHCP6_Request() / \
- DHCP6OptClientId(duid=client_duid)
-
- sent_packets.append(dhcp6_request_pkt)
- txq.send(dhcp6_request_pkt)
-
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError('DHCPv6 REQUEST timeout')
-
- if ether['IPv6'].src != proxy_ip:
- raise RuntimeError("Source IP address error: {} != {}".format(
- ether['IPv6'].src, proxy_ip))
- print "Source IP address: OK."
-
- if ether['IPv6'].dst != server_ip:
- raise RuntimeError("Destination IP address error: {} != {}".format(
- ether['IPv6'].dst, server_ip))
- print "Destination IP address: OK."
-
- msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
- ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype)
- if msgtype != 'RELAY-FORW':
- raise RuntimeError("Message type error: {} != RELAY-FORW".format(
- msgtype))
- print "Message type: OK."
-
- linkaddr = ether['IPv6']['UDP']\
- ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
- if linkaddr != proxy_ip:
- raise RuntimeError("Proxy IP address error: {} != {}".format(
- linkaddr, proxy_ip))
- print "Proxy IP address: OK."
-
-
-def dhcpv6_reply(rx_if, tx_if, link_local_ip, proxy_ip, server_ip, server_mac,
- interface_id):
- """Send and check DHCPv6 REPLY proxy packet.
-
- :param rx_if: DHCPv6 server interface.
- :param tx_if: Client interface.
- :param link_local_ip: Client link-local address.
- :param proxy_ip: IP address of DHCPv6 proxy server.
- :param server_ip: IP address of DHCPv6 server.
- :param server_mac: MAC address of DHCPv6 server.
- :param interface_id: ID of proxy interface.
- :type rx_if: str
- :type tx_if: str
- :type link_local_ip: str
- :type proxy_ip: str
- :type server_ip: str
- :type server_mac: str
- :type interface_id: str
- """
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
-
- dhcp_reply_pkt = Ether(src=server_mac) / \
- IPv6(src=server_ip, dst=proxy_ip) / \
- UDP(sport=UDP_SERVICES.dhcpv6_server,
- dport=UDP_SERVICES.dhcpv6_client) / \
- DHCP6_RelayReply(peeraddr=link_local_ip,
- linkaddr=proxy_ip) / \
- DHCP6OptIfaceId(ifaceid=interface_id) / \
- DHCP6OptRelayMsg() / \
- DHCP6_Reply()
-
- sent_packets.append(dhcp_reply_pkt)
- txq.send(dhcp_reply_pkt)
-
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError('DHCPv6 REPLY timeout')
-
- if ether['IPv6'].src != proxy_ip:
- raise RuntimeError("Source IP address error: {} != {}".format(
- ether['IPv6'].src, proxy_ip))
- print "Source IP address: OK."
-
- if not _check_udp_checksum(ether['IPv6']):
- raise RuntimeError("Checksum error!")
- print "Checksum: OK."
-
- msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
- ['DHCPv6 Reply Message'].msgtype)
- if msgtype != 'REPLY':
- raise RuntimeError("Message type error: {} != REPLY".format(msgtype))
- print "Message type: OK."
-
-
-def main():
- """Send DHCPv6 proxy messages."""
-
- args = TrafficScriptArg(['tx_src_ip', 'tx_dst_ip', 'proxy_ip', 'proxy_mac',
- 'server_ip', 'client_mac', 'server_mac',
- 'proxy_to_server_mac'])
-
- client_if = args.get_arg('tx_if')
- server_if = args.get_arg('rx_if')
- proxy_ip = args.get_arg('proxy_ip')
- proxy_mac = args.get_arg('proxy_mac')
- proxy_to_server_mac = args.get_arg('proxy_to_server_mac')
- server_ip = args.get_arg('server_ip')
- client_mac = args.get_arg('client_mac')
- server_mac = args.get_arg('server_mac')
-
- link_local_ip = "fe80::1"
- dhcp_multicast_ip = "ff02::1:2"
- client_duid = str(random.randint(0, 9999))
-
- # SOLICIT
- interface_id = dhcpv6_solicit(client_if, server_if, dhcp_multicast_ip,
- link_local_ip, proxy_ip, server_ip,
- server_mac, client_duid, client_mac)
-
- # ADVERTISE
- dhcpv6_advertise(client_if, server_if, link_local_ip, proxy_ip,
- server_ip, server_mac, proxy_to_server_mac, interface_id)
-
- # REQUEST
- dhcpv6_request(client_if, server_if, dhcp_multicast_ip, link_local_ip,
- proxy_ip, server_ip, client_duid, client_mac)
-
- # REPLY
- dhcpv6_reply(client_if, server_if, link_local_ip, proxy_ip, server_ip,
- server_mac, interface_id)
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/icmpv6_echo.py b/resources/traffic_scripts/icmpv6_echo.py
deleted file mode 100755
index a18896b07f..0000000000
--- a/resources/traffic_scripts/icmpv6_echo.py
+++ /dev/null
@@ -1,108 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script for ICMPv6 echo test."""
-
-import sys
-import logging
-
-# pylint: disable=no-name-in-module
-# pylint: disable=import-error
-logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-
-from scapy.all import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
-from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.PacketVerifier import checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def main():
- args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip'])
-
- rxq = RxQueue(args.get_arg('rx_if'))
- txq = TxQueue(args.get_arg('tx_if'))
-
- src_mac = args.get_arg('src_mac')
- dst_mac = args.get_arg('dst_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- echo_id = 0xa
- echo_seq = 0x1
-
- sent_packets = []
-
- # send ICMPv6 neighbor advertisement message
- pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=src_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=src_mac))
- sent_packets.append(pkt_send)
- txq.send(pkt_send)
-
- # send ICMPv6 echo request
- pkt_send = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
- sent_packets.append(pkt_send)
- txq.send(pkt_send)
-
- # receive ICMPv6 echo reply
- while True:
- ether = rxq.recv(2, sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
- format(ether.__repr__()))
-
- ipv6 = ether[IPv6]
-
- if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError('Unexpected packet with no ICMPv6 echo reply '
- 'received {0}'.format(ipv6.__repr__()))
-
- icmpv6 = ipv6[ICMPv6EchoReply]
-
- # check identifier and sequence number
- if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} seq {1} '
- 'should be ID {2} seq {3}'.
- format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
-
- # verify checksum
- cksum = icmpv6.cksum
- del icmpv6.cksum
- tmp = ICMPv6EchoReply(str(icmpv6))
- if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError('Invalid checksum {0} should be {1}'.
- format(cksum, tmp.cksum))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/icmpv6_echo_req_resp.py b/resources/traffic_scripts/icmpv6_echo_req_resp.py
deleted file mode 100755
index 195f666b38..0000000000
--- a/resources/traffic_scripts/icmpv6_echo_req_resp.py
+++ /dev/null
@@ -1,190 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2018 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Send ICMPv6 echo request from one TG port to DUT port or to another TG port
- through DUT node(s) and send reply back. Also verify hop limit processing."""
-
-import sys
-import logging
-
-# pylint: disable=no-name-in-module
-# pylint: disable=import-error
-logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
-from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
-from scapy.all import Ether
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.PacketVerifier import checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def main():
- args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
- 'src_ip', 'dst_ip', 'h_num'], ['is_dst_tg'])
-
- src_rxq = RxQueue(args.get_arg('tx_if'))
- src_txq = TxQueue(args.get_arg('tx_if'))
-
- src_mac = args.get_arg('src_mac')
- dst_mac = args.get_arg('dst_mac')
- src_nh_mac = args.get_arg('src_nh_mac')
- dst_nh_mac = args.get_arg('dst_nh_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- hop_num = int(args.get_arg('h_num'))
-
- is_dst_tg = True if args.get_arg('is_dst_tg') in ['True', ''] else False
- dst_rxq = RxQueue(args.get_arg('rx_if')) if is_dst_tg else None
- dst_txq = TxQueue(args.get_arg('rx_if')) if is_dst_tg else None
-
- hop_limit = 64
- echo_id = 0xa
- echo_seq = 0x1
-
- src_sent_packets = []
- dst_sent_packets = []
-
- # send ICMPv6 neighbor advertisement message
- pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=src_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=src_mac))
- src_sent_packets.append(pkt_send)
- src_txq.send(pkt_send)
-
- if is_dst_tg:
- # send ICMPv6 neighbor advertisement message
- pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=dst_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
- dst_sent_packets.append(pkt_send)
- dst_txq.send(pkt_send)
-
- # send ICMPv6 echo request from first TG interface
- pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
- ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
- src_sent_packets.append(pkt_send)
- src_txq.send(pkt_send)
-
- if is_dst_tg:
- # receive ICMPv6 echo request on second TG interface
- while True:
- ether = dst_rxq.recv(2, dst_sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is
- # ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received: {0}'.
- format(ether.__repr__()))
-
- ipv6 = ether[IPv6]
-
- # verify hop limit processing
- if ipv6.hlim != (hop_limit - hop_num):
- raise RuntimeError('Invalid hop limit {0} should be {1}'.
- format(ipv6.hlim, hop_limit - hop_num))
-
- if not ipv6.haslayer(ICMPv6EchoRequest):
- raise RuntimeError('Unexpected packet with no IPv6 ICMP received '
- '{0}'.format(ipv6.__repr__()))
-
- icmpv6 = ipv6[ICMPv6EchoRequest]
-
- # check identifier and sequence number
- if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
- 'seq {1} should be ID {2} seq {3}'.
- format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
-
- # verify checksum
- cksum = icmpv6.cksum
- del icmpv6.cksum
- tmp = ICMPv6EchoRequest(str(icmpv6))
- if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError('Invalid checksum {0} should be {1}'.
- format(cksum, tmp.cksum))
-
- # send ICMPv6 echo reply from second TG interface
- pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
- IPv6(src=dst_ip, dst=src_ip, hlim=(ipv6.hlim - 1)) /
- ICMPv6EchoReply(id=echo_id, seq=echo_seq))
- dst_sent_packets.append(pkt_send)
- dst_txq.send(pkt_send)
-
- # receive ICMPv6 echo reply on first TG interface
- while True:
- ether = src_rxq.recv(2, src_sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'.
- format(ether.__repr__()))
-
- ipv6 = ether[IPv6]
-
- # verify hop limit processing; destination node decrements hlim by one in
- # outgoing ICMPv6 Echo Reply
- directions = 2 if is_dst_tg else 1
- hop_limit_reply = hop_limit - directions * hop_num - 1
- if ipv6.hlim != hop_limit_reply:
- raise RuntimeError('Invalid hop limit {0} should be {1}'.
- format(ipv6.hlim, hop_limit_reply))
-
- if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
- format(ipv6.__repr__()))
-
- icmpv6 = ipv6[ICMPv6EchoReply]
-
- # check identifier and sequence number
- if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
- 'seq {1} should be ID {2} seq {3}'.
- format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
-
- # verify checksum
- cksum = icmpv6.cksum
- del icmpv6.cksum
- tmp = ICMPv6EchoReply(str(icmpv6))
- if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError('Invalid checksum {0} should be {1}'.
- format(cksum, tmp.cksum))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/ipsec.py b/resources/traffic_scripts/ipsec.py
index b853b6c09f..320303d1fb 100755
--- a/resources/traffic_scripts/ipsec.py
+++ b/resources/traffic_scripts/ipsec.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,18 +18,18 @@
import sys
import logging
+from ipaddress import ip_address
# pylint: disable=no-name-in-module
# pylint: disable=import-error
-logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-
-from scapy.all import Ether
+logging.getLogger(u"scapy.runtime").setLevel(logging.ERROR)
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.layers.ipsec import SecurityAssociation, ESP
-from ipaddress import ip_address
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in):
@@ -50,37 +50,39 @@ def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(ip_layer):
- raise RuntimeError('Not an {ip} packet received: {pkt}'.format(
- ip=ip_layer.name, pkt=pkt_recv.__repr__()))
+ raise RuntimeError(
+ f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
+ )
- if pkt_recv[ip_layer.name].dst != dst_tun:
+ if pkt_recv[ip_layer].dst != dst_tun:
raise RuntimeError(
- 'Received packet has invalid destination address: {rec_ip} '
- 'should be: {exp_ip}'.format(
- rec_ip=pkt_recv[ip_layer.name].dst, exp_ip=dst_tun))
+ f"Received packet has invalid destination address: "
+ f"{pkt_recv[ip_layer].dst} should be: {dst_tun}"
+ )
if not pkt_recv.haslayer(ESP):
- raise RuntimeError(
- 'Not an ESP packet received: {pkt}'.format(pkt=pkt_recv.__repr__()))
+ raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}")
ip_pkt = pkt_recv[ip_layer]
d_pkt = sa_in.decrypt(ip_pkt)
if d_pkt[ip_layer].dst != dst_ip:
raise RuntimeError(
- 'Decrypted packet has invalid destination address: {rec_ip} '
- 'should be: {exp_ip}'.format(
- rec_ip=d_pkt[ip_layer].dst, exp_ip=dst_ip))
+ f"Decrypted packet has invalid destination address: "
+ f"{d_pkt[ip_layer].dst} should be: {dst_ip}"
+ )
if d_pkt[ip_layer].src != src_ip:
raise RuntimeError(
- 'Decrypted packet has invalid source address: {rec_ip} should be: '
- '{exp_ip}'.format(rec_ip=d_pkt[ip_layer].src, exp_ip=src_ip))
+ f"Decrypted packet has invalid source address: "
+ f"{d_pkt[ip_layer].src} should be: {src_ip}"
+ )
- if ip_layer == IP and d_pkt[ip_layer.name].proto != 61:
+ if ip_layer == IP and d_pkt[ip_layer].proto != 61:
raise RuntimeError(
- 'Decrypted packet has invalid IP protocol: {rec_proto} '
- 'should be: 61'.format(rec_proto=d_pkt[ip_layer.name].proto))
+ f"Decrypted packet has invalid IP protocol: "
+ f"{d_pkt[ip_layer].proto} should be: 61"
+ )
def check_ip(pkt_recv, ip_layer, src_ip, dst_ip):
@@ -97,25 +99,27 @@ def check_ip(pkt_recv, ip_layer, src_ip, dst_ip):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(ip_layer):
- raise RuntimeError('Not an {ip} packet received: {pkt}'.format(
- ip=ip_layer.name, pkt=pkt_recv.__repr__()))
+ raise RuntimeError(
+ f"Not an {ip_layer.name} packet received: {pkt_recv!r}"
+ )
- if pkt_recv[ip_layer.name].dst != dst_ip:
+ if pkt_recv[ip_layer].dst != dst_ip:
raise RuntimeError(
- 'Received packet has invalid destination address: {rec_ip} '
- 'should be: {exp_ip}'.format(
- rec_ip=pkt_recv[ip_layer.name].dst, exp_ip=dst_ip))
+ f"Received packet has invalid destination address: "
+ f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}"
+ )
- if pkt_recv[ip_layer.name].src != src_ip:
+ if pkt_recv[ip_layer].src != src_ip:
raise RuntimeError(
- 'Received packet has invalid destination address: {rec_ip} '
- 'should be: {exp_ip}'.format(
- rec_ip=pkt_recv[ip_layer.name].dst, exp_ip=src_ip))
+ f"Received packet has invalid destination address: "
+ f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}"
+ )
- if ip_layer == IP and pkt_recv[ip_layer.name].proto != 61:
+ if ip_layer == IP and pkt_recv[ip_layer].proto != 61:
raise RuntimeError(
- 'Received packet has invalid IP protocol: {rec_proto} '
- 'should be: 61'.format(rec_proto=pkt_recv[ip_layer.name].proto))
+ f"Received packet has invalid IP protocol: "
+ f"{pkt_recv[ip_layer].proto} should be: 61"
+ )
# pylint: disable=too-many-locals
@@ -124,36 +128,35 @@ def main():
"""Send and receive IPsec packet."""
args = TrafficScriptArg(
- ['tx_src_mac', 'tx_dst_mac', 'rx_src_mac', 'rx_dst_mac', 'src_ip',
- 'dst_ip','crypto_alg', 'crypto_key', 'integ_alg', 'integ_key',
- 'l_spi', 'r_spi'],
- ['src_tun', 'dst_tun']
+ [
+ u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac",
+ u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg",
+ u"integ_key", u"l_spi", u"r_spi"
+ ],
+ [u"src_tun", u"dst_tun"]
)
- tx_txq = TxQueue(args.get_arg('tx_if'))
- tx_rxq = RxQueue(args.get_arg('tx_if'))
- rx_txq = TxQueue(args.get_arg('rx_if'))
- rx_rxq = RxQueue(args.get_arg('rx_if'))
-
- tx_src_mac = args.get_arg('tx_src_mac')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- rx_src_mac = args.get_arg('rx_src_mac')
- rx_dst_mac = args.get_arg('rx_dst_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- crypto_alg = args.get_arg('crypto_alg')
- crypto_key = args.get_arg('crypto_key')
- integ_alg = args.get_arg('integ_alg')
- integ_key = args.get_arg('integ_key')
- l_spi = int(args.get_arg('l_spi'))
- r_spi = int(args.get_arg('r_spi'))
- src_tun = args.get_arg('src_tun')
- dst_tun = args.get_arg('dst_tun')
-
- if ip_address(unicode(src_ip)).version == 6:
- ip_layer = IPv6
- else:
- ip_layer = IP
+ tx_txq = TxQueue(args.get_arg(u"tx_if"))
+ tx_rxq = RxQueue(args.get_arg(u"tx_if"))
+ rx_txq = TxQueue(args.get_arg(u"rx_if"))
+ rx_rxq = RxQueue(args.get_arg(u"rx_if"))
+
+ tx_src_mac = args.get_arg(u"tx_src_mac")
+ tx_dst_mac = args.get_arg(u"tx_dst_mac")
+ rx_src_mac = args.get_arg(u"rx_src_mac")
+ rx_dst_mac = args.get_arg(u"rx_dst_mac")
+ src_ip = args.get_arg(u"src_ip")
+ dst_ip = args.get_arg(u"dst_ip")
+ crypto_alg = args.get_arg(u"crypto_alg")
+ crypto_key = args.get_arg(u"crypto_key")
+ integ_alg = args.get_arg(u"integ_alg")
+ integ_key = args.get_arg(u"integ_key")
+ l_spi = int(args.get_arg(u"l_spi"))
+ r_spi = int(args.get_arg(u"r_spi"))
+ src_tun = args.get_arg(u"src_tun")
+ dst_tun = args.get_arg(u"dst_tun")
+
+ ip_layer = IP if ip_address(src_ip).version == 4 else IPv6
tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \
else None
@@ -163,23 +166,28 @@ def main():
if not (src_tun and dst_tun):
src_tun = src_ip
- sa_in = SecurityAssociation(ESP, spi=r_spi, crypt_algo=crypto_alg,
- crypt_key=crypto_key, auth_algo=integ_alg,
- auth_key=integ_key, tunnel_header=tunnel_in)
+ sa_in = SecurityAssociation(
+ ESP, spi=r_spi, crypt_algo=crypto_alg,
+ crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
+ auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in
+ )
- sa_out = SecurityAssociation(ESP, spi=l_spi, crypt_algo=crypto_alg,
- crypt_key=crypto_key, auth_algo=integ_alg,
- auth_key=integ_key, tunnel_header=tunnel_out)
+ sa_out = SecurityAssociation(
+ ESP, spi=l_spi, crypt_algo=crypto_alg,
+ crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg,
+ auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out
+ )
ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \
else ip_layer(src=src_ip, dst=dst_ip)
- ip_pkt = ip_layer(str(ip_pkt))
+ ip_pkt = ip_layer(ip_pkt)
e_pkt = sa_out.encrypt(ip_pkt)
tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
e_pkt)
sent_packets = list()
+ tx_pkt_send /= Raw()
sent_packets.append(tx_pkt_send)
tx_txq.send(tx_pkt_send)
@@ -187,8 +195,7 @@ def main():
rx_pkt_recv = rx_rxq.recv(2)
if rx_pkt_recv is None:
- raise RuntimeError(
- '{ip} packet Rx timeout'.format(ip=ip_layer.name))
+ raise RuntimeError(f"{ip_layer.name} packet Rx timeout")
if rx_pkt_recv.haslayer(ICMPv6ND_NS):
# read another packet in the queue if the current one is ICMPv6ND_NS
@@ -204,15 +211,16 @@ def main():
rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) /
rx_ip_pkt)
+ rx_pkt_send /= Raw()
rx_txq.send(rx_pkt_send)
while True:
tx_pkt_recv = tx_rxq.recv(2, sent_packets)
if tx_pkt_recv is None:
- raise RuntimeError('ESP packet Rx timeout')
+ raise RuntimeError(u"ESP packet Rx timeout")
- if rx_pkt_recv.haslayer(ICMPv6ND_NS):
+ if tx_pkt_recv.haslayer(ICMPv6ND_NS):
# read another packet in the queue if the current one is ICMPv6ND_NS
continue
else:
@@ -224,5 +232,5 @@ def main():
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()
diff --git a/resources/traffic_scripts/ipv4_ping_ttl_check.py b/resources/traffic_scripts/ipv4_ping_ttl_check.py
deleted file mode 100755
index 99a7b0a5f0..0000000000
--- a/resources/traffic_scripts/ipv4_ping_ttl_check.py
+++ /dev/null
@@ -1,145 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from scapy.all import Ether, IP, ICMP
-from resources.libraries.python.PacketVerifier \
- import Interface, create_gratuitous_arp_request, auto_pad, checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def check_ttl(ttl_begin, ttl_end, ttl_diff):
- if ttl_begin != ttl_end + ttl_diff:
- raise RuntimeError(
- "TTL changed from {} to {} but decrease by {} expected"
- .format(ttl_begin, ttl_end, ttl_diff))
-
-
-def ckeck_packets_equal(pkt_send, pkt_recv):
- pkt_send_raw = auto_pad(pkt_send)
- pkt_recv_raw = auto_pad(pkt_recv)
- if pkt_send_raw != pkt_recv_raw:
- print "Sent: {}".format(pkt_send_raw.encode('hex'))
- print "Received: {}".format(pkt_recv_raw.encode('hex'))
- print "Sent:"
- pkt_send.show2()
- print "Received:"
- pkt_recv.show2()
- raise RuntimeError("Sent packet doesn't match received packet")
-
-
-def main():
- args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'hops', 'first_hop_mac', 'is_dst_tg'])
-
- src_if_name = args.get_arg('tx_if')
- dst_if_name = args.get_arg('rx_if')
- is_dst_tg = True if args.get_arg('is_dst_tg') == 'True' else False
-
- src_mac = args.get_arg('src_mac')
- first_hop_mac = args.get_arg('first_hop_mac')
- dst_mac = args.get_arg('dst_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- hops = int(args.get_arg('hops'))
-
- if is_dst_tg and (src_if_name == dst_if_name):
- raise RuntimeError(
- "Source interface name equals destination interface name")
-
- src_if = Interface(src_if_name)
- src_if.send_pkt(str(create_gratuitous_arp_request(src_mac, src_ip)))
- if is_dst_tg:
- dst_if = Interface(dst_if_name)
- dst_if.send_pkt(str(create_gratuitous_arp_request(dst_mac, dst_ip)))
-
- pkt_req_send = (Ether(src=src_mac, dst=first_hop_mac) /
- IP(src=src_ip, dst=dst_ip) /
- ICMP())
- src_if.send_pkt(pkt_req_send)
-
- if is_dst_tg:
- pkt_req_recv = dst_if.recv_pkt()
- if pkt_req_recv is None:
- raise RuntimeError('Timeout waiting for packet')
-
- check_ttl(pkt_req_send[IP].ttl, pkt_req_recv[IP].ttl, hops)
- pkt_req_send_mod = pkt_req_send.copy()
- pkt_req_send_mod[IP].ttl = pkt_req_recv[IP].ttl
- del pkt_req_send_mod[IP].chksum # update checksum
- ckeck_packets_equal(pkt_req_send_mod[IP], pkt_req_recv[IP])
-
- pkt_resp_send = (Ether(src=dst_mac, dst=pkt_req_recv.src) /
- IP(src=dst_ip, dst=src_ip) /
- ICMP(type=0)) # echo-reply
- dst_if.send_pkt(pkt_resp_send)
-
- pkt_resp_recv = src_if.recv_pkt()
- if pkt_resp_recv is None:
- raise RuntimeError('Timeout waiting for packet')
-
- if is_dst_tg:
- check_ttl(pkt_resp_send[IP].ttl, pkt_resp_recv[IP].ttl, hops)
- pkt_resp_send_mod = pkt_resp_send.copy()
- pkt_resp_send_mod[IP].ttl = pkt_resp_recv[IP].ttl
- del pkt_resp_send_mod[IP].chksum # update checksum
- ckeck_packets_equal(pkt_resp_send_mod[IP], pkt_resp_recv[IP])
-
- if not pkt_resp_recv.haslayer(IP):
- raise RuntimeError('Received packet does not contain IPv4 header: {}'.
- format(pkt_resp_recv.__repr__()))
-
- if pkt_resp_recv[IP].src != pkt_req_send[IP].dst:
- raise RuntimeError(
- 'Received IPv4 packet contains wrong src IP address, '
- '{} instead of {}'.format(pkt_resp_recv[IP].src,
- pkt_req_send[IP].dst))
-
- if pkt_resp_recv[IP].dst != pkt_req_send[IP].src:
- raise RuntimeError(
- 'Received IPv4 packet contains wrong dst IP address, '
- '{} instead of {}'.format(pkt_resp_recv[IP].dst,
- pkt_req_send[IP].src))
-
- # verify IPv4 checksum
- copy = pkt_resp_recv.copy()
- chksum = copy[IP].chksum
- del copy[IP].chksum
- tmp = IP(str(copy[IP]))
- if not checksum_equal(tmp.chksum, chksum):
- raise RuntimeError('Received IPv4 packet contains invalid checksum, '
- '{} instead of {}'.format(chksum, tmp.chksum))
-
- if not pkt_resp_recv[IP].haslayer(ICMP):
- raise RuntimeError(
- 'Received IPv4 packet does not contain ICMP header: {}'.
- format(pkt_resp_recv[IP].__repr__()))
-
- # verify ICMP checksum
- copy = pkt_resp_recv.copy()
- chksum = copy[IP][ICMP].chksum
- del copy[IP][ICMP].chksum
- tmp = ICMP(str(copy[IP][ICMP]))
- if not checksum_equal(tmp.chksum, chksum):
- raise RuntimeError('Received ICMP packet contains invalid checksum, '
- '{} instead of {}'.format(chksum, tmp.chksum))
-
- pkt_req_send_mod = pkt_req_send.copy()
- pkt_req_send_mod[IP][ICMP].type = pkt_resp_recv[IP][ICMP].type
- del pkt_req_send_mod[IP][ICMP].chksum # update checksum
- ckeck_packets_equal(pkt_req_send_mod[IP][ICMP], pkt_resp_recv[IP][ICMP])
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/ipv4_sweep_ping.py b/resources/traffic_scripts/ipv4_sweep_ping.py
deleted file mode 100755
index e258d45213..0000000000
--- a/resources/traffic_scripts/ipv4_sweep_ping.py
+++ /dev/null
@@ -1,112 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script for IPv4 sweep ping."""
-
-import logging
-import os
-import sys
-
-logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, \
- create_gratuitous_arp_request, checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet import IP, ICMP
-from scapy.all import Ether, Raw
-
-
-def main():
- # start_size - start size of the ICMPv4 echo data
- # end_size - end size of the ICMPv4 echo data
- # step - increment step
- args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'start_size', 'end_size', 'step'])
-
- rxq = RxQueue(args.get_arg('rx_if'))
- txq = TxQueue(args.get_arg('tx_if'))
-
- src_mac = args.get_arg('src_mac')
- dst_mac = args.get_arg('dst_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- start_size = int(args.get_arg('start_size'))
- end_size = int(args.get_arg('end_size'))
- step = int(args.get_arg('step'))
- echo_id = 0xa
- # generate some random data buffer
- data = bytearray(os.urandom(end_size))
-
- sent_packets = []
- pkt_send = create_gratuitous_arp_request(src_mac, src_ip)
- sent_packets.append(pkt_send)
- txq.send(pkt_send)
-
- # send ICMP echo request with incremented data length and receive ICMP
- # echo reply
- for echo_seq in range(start_size, end_size + 1, step):
- pkt_send = (Ether(src=src_mac, dst=dst_mac) /
- IP(src=src_ip, dst=dst_ip) /
- ICMP(id=echo_id, seq=echo_seq) /
- Raw(load=data[0:echo_seq]))
- sent_packets.append(pkt_send)
- txq.send(pkt_send)
-
- ether = rxq.recv(ignore=sent_packets)
- if ether is None:
- raise RuntimeError(
- 'ICMP echo reply seq {0} Rx timeout'.format(echo_seq))
-
- if not ether.haslayer(IP):
- raise RuntimeError(
- 'Unexpected packet with no IPv4 received {0}'.format(
- ether.__repr__()))
-
- ipv4 = ether['IP']
-
- if not ipv4.haslayer(ICMP):
- raise RuntimeError(
- 'Unexpected packet with no ICMP received {0}'.format(
- ipv4.__repr__()))
-
- icmpv4 = ipv4['ICMP']
-
- if icmpv4.id != echo_id or icmpv4.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMP echo reply received ID {0} seq {1} should be '
- 'ID {2} seq {3}, {0}'.format(icmpv4.id, icmpv4.seq, echo_id,
- echo_seq))
-
- chksum = icmpv4.chksum
- del icmpv4.chksum
- tmp = ICMP(str(icmpv4))
- if not checksum_equal(tmp.chksum, chksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(chksum, tmp.chksum))
-
- if 'Raw' in icmpv4:
- load = icmpv4['Raw'].load
- else:
- load = ""
- if load != data[0:echo_seq]:
- raise RuntimeError(
- 'Received ICMP payload does not match sent payload')
-
- sent_packets.remove(pkt_send)
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py
deleted file mode 100755
index 1d96050cf4..0000000000
--- a/resources/traffic_scripts/ipv6_nd_proxy_check.py
+++ /dev/null
@@ -1,207 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends DHCPv6 proxy packets."""
-
-from scapy.layers.inet import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
-
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
- """Send ICMPv6 Neighbor Solicitation packet and expect a response
- from the proxy.
-
- :param tx_if: Interface on TG.
- :param src_mac: MAC address of TG interface.
- :param dst_mac: MAC address of proxy interface.
- :param src_ip: IP address of TG interface.
- :param dst_ip: IP address of proxied interface.
- :type tx_if: str
- :type src_mac: str
- :type dst_mac: str
- :type src_ip: str
- :type dst_ip: str
- :raises RuntimeError: If the received packet is not correct.
- """
-
- rxq = RxQueue(tx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
-
- icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip) /
- ICMPv6ND_NS(tgt=dst_ip))
-
- sent_packets.append(icmpv6nd_solicit_pkt)
- txq.send(icmpv6nd_solicit_pkt)
-
- ether = None
- for _ in range(5):
- ether = rxq.recv(3, ignore=sent_packets)
- if not ether:
- continue
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue in case of ICMPv6ND_NS packet
- continue
- else:
- # otherwise process the current packet
- break
-
- if ether is None:
- raise RuntimeError('ICMPv6ND Proxy response timeout.')
-
- if ether.src != dst_mac:
- raise RuntimeError("Source MAC address error: {} != {}".
- format(ether.src, dst_mac))
- print "Source MAC address: OK."
-
- if ether.dst != src_mac:
- raise RuntimeError("Destination MAC address error: {} != {}".
- format(ether.dst, src_mac))
- print "Destination MAC address: OK."
-
- if ether[IPv6].src != dst_ip:
- raise RuntimeError("Source IP address error: {} != {}".
- format(ether[IPv6].src, dst_ip))
- print "Source IP address: OK."
-
- if ether[IPv6].dst != src_ip:
- raise RuntimeError("Destination IP address error: {} != {}".
- format(ether[IPv6].dst, src_ip))
- print "Destination IP address: OK."
-
- try:
- target_addr = ether[IPv6][ICMPv6ND_NA].tgt
- except (KeyError, AttributeError):
- raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
-
- if target_addr != dst_ip:
- raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}".
- format(target_addr, dst_ip))
- print "Target address field: OK."
-
-
-def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
- proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip):
- """Sends ICMPv6 Echo Request, receive it and send a reply.
-
- :param src_if: First TG interface on link to DUT.
- :param dst_if: Second TG interface on link to DUT.
- :param src_mac: MAC address of first interface.
- :param dst_mac: MAC address of second interface.
- :param proxy_to_src_mac: MAC address of first proxy interface on DUT.
- :param proxy_to_dst_mac: MAC address of second proxy interface on DUT.
- :param src_ip: IP address of first interface.
- :param dst_ip: IP address of second interface.
- :type src_if: str
- :type dst_if: str
- :type src_mac: str
- :type dst_mac: str
- :type proxy_to_src_mac: str
- :type proxy_to_dst_mac: str
- :type src_ip: str
- :type dst_ip: str
- :raises RuntimeError: If a received packet is not correct.
- """
- rxq = RxQueue(dst_if)
- txq = TxQueue(src_if)
-
- icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest())
-
- txq.send(icmpv6_ping_pkt)
-
- ether = None
- while True:
- ether = rxq.recv(3)
- if not ether:
- continue
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue in case of ICMPv6ND_NS packet
- continue
- else:
- # otherwise process the current packet
- break
-
- if ether is None:
- raise RuntimeError('ICMPv6 Echo Request timeout.')
- try:
- ether[IPv6]["ICMPv6 Echo Request"]
- except KeyError:
- raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
- print "ICMP Echo: OK."
-
- rxq = RxQueue(src_if)
- txq = TxQueue(dst_if)
-
- icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) /
- IPv6(src=dst_ip, dst=src_ip) /
- ICMPv6EchoReply())
-
- txq.send(icmpv6_ping_pkt)
-
- ether = None
- while True:
- ether = rxq.recv(3)
- if not ether:
- continue
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue in case of ICMPv6ND_NS packet
- continue
- else:
- # otherwise process the current packet
- break
-
- if ether is None:
- raise RuntimeError('DHCPv6 SOLICIT timeout')
- try:
- ether[IPv6]["ICMPv6 Echo Reply"]
- except KeyError:
- raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
-
- print "ICMP Reply: OK."
-
-
-def main():
- """Send DHCPv6 proxy messages."""
-
- args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac',
- 'proxy_to_src_mac', 'proxy_to_dst_mac'])
-
- src_if = args.get_arg('tx_if')
- dst_if = args.get_arg('rx_if')
- src_ip = args.get_arg("src_ip")
- dst_ip = args.get_arg("dst_ip")
- src_mac = args.get_arg('src_mac')
- dst_mac = args.get_arg('dst_mac')
- proxy_to_src_mac = args.get_arg('proxy_to_src_mac')
- proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac')
-
- # Neighbor solicitation
- imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
-
- # Verify route (ICMP echo/reply)
- ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac,
- proxy_to_dst_mac, src_ip, dst_ip)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/ipv6_ns.py b/resources/traffic_scripts/ipv6_ns.py
deleted file mode 100755
index 5852e6317f..0000000000
--- a/resources/traffic_scripts/ipv6_ns.py
+++ /dev/null
@@ -1,112 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script for IPv6 Neighbor Solicitation test."""
-
-import sys
-import logging
-
-# pylint: disable=no-name-in-module
-# pylint: disable=import-error
-logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-
-from scapy.all import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
-from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.PacketVerifier import checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def main():
- args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip'])
-
- rxq = RxQueue(args.get_arg('rx_if'))
- txq = TxQueue(args.get_arg('tx_if'))
-
- src_mac = args.get_arg('src_mac')
- dst_mac = args.get_arg('dst_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
-
- sent_packets = []
-
- # send ICMPv6 neighbor solicitation message
- pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NS(tgt=dst_ip) /
- ICMPv6NDOptSrcLLAddr(lladdr=src_mac))
- sent_packets.append(pkt_send)
- txq.send(pkt_send)
-
- # receive ICMPv6 neighbor advertisement message
- while True:
- ether = rxq.recv(2, sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
-
- if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
- format(ether.__repr__()))
-
- ipv6 = ether[IPv6]
-
- if not ipv6.haslayer(ICMPv6ND_NA):
- raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received '
- '{0}'.format(ipv6.__repr__()))
-
- icmpv6_na = ipv6[ICMPv6ND_NA]
-
- # verify target address
- if icmpv6_na.tgt != dst_ip:
- raise RuntimeError('Invalid target address {0} should be {1}'.
- format(icmpv6_na.tgt, dst_ip))
-
- if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr):
- raise RuntimeError('Missing Destination Link-Layer Address option in '
- 'ICMPv6 Neighbor Advertisement {0}'.
- format(icmpv6_na.__repr__()))
-
- dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr]
-
- # verify destination link-layer address field
- if dst_ll_addr.lladdr != dst_mac:
- raise RuntimeError('Invalid lladdr {0} should be {1}'.
- format(dst_ll_addr.lladdr, dst_mac))
-
- # verify checksum
- cksum = icmpv6_na.cksum
- del icmpv6_na.cksum
- tmp = ICMPv6ND_NA(str(icmpv6_na))
- if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError('Invalid checksum {0} should be {1}'.
- format(cksum, tmp.cksum))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/ipv6_sweep_ping.py b/resources/traffic_scripts/ipv6_sweep_ping.py
deleted file mode 100755
index 28d23a16ef..0000000000
--- a/resources/traffic_scripts/ipv6_sweep_ping.py
+++ /dev/null
@@ -1,120 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script for IPv6 sweep ping."""
-
-import logging
-import os
-import sys
-
-# pylint: disable=no-name-in-module
-# pylint: disable=import-error
-logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-
-from scapy.all import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
-from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.PacketVerifier import checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def main():
- # start_size - start size of the ICMPv6 echo data
- # end_size - end size of the ICMPv6 echo data
- # step - increment step
- args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'start_size', 'end_size', 'step'])
-
- rxq = RxQueue(args.get_arg('rx_if'))
- txq = TxQueue(args.get_arg('tx_if'))
-
- src_mac = args.get_arg('src_mac')
- dst_mac = args.get_arg('dst_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- start_size = int(args.get_arg('start_size'))
- end_size = int(args.get_arg('end_size'))
- step = int(args.get_arg('step'))
- echo_id = 0xa
- # generate some random data buffer
- data = bytearray(os.urandom(end_size))
-
- # send ICMPv6 neighbor advertisement message
- sent_packets = []
- pkt_send = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6ND_NA(tgt=src_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=src_mac))
- sent_packets.append(pkt_send)
- txq.send(pkt_send)
-
- # send ICMPv6 echo request with incremented data length and receive ICMPv6
- # echo reply
- for echo_seq in range(start_size, end_size + 1, step):
- pkt_send = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest(id=echo_id, seq=echo_seq,
- data=data[0:echo_seq]))
- sent_packets.append(pkt_send)
- txq.send(pkt_send)
-
- while True:
- ether = rxq.recv(ignore=sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply seq {0} '
- 'Rx timeout'.format(echo_seq))
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue in case of ICMPv6ND_NS packet
- continue
- else:
- # otherwise process the current packet
- break
-
- if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 layer '
- 'received: {0}'.format(ether.__repr__()))
-
- ipv6 = ether[IPv6]
-
- if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError('Unexpected packet with no ICMPv6 echo reply '
- 'layer received: {0}'.format(ipv6.__repr__()))
-
- icmpv6 = ipv6[ICMPv6EchoReply]
-
- if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError('ICMPv6 echo reply with invalid data received: '
- 'ID {0} seq {1} should be ID {2} seq {3}, {0}'.
- format(icmpv6.id, icmpv6.seq, echo_id,
- echo_seq))
-
- cksum = icmpv6.cksum
- del icmpv6.cksum
- tmp = ICMPv6EchoReply(str(icmpv6))
- if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError('Invalid checksum: {0} should be {1}'.
- format(cksum, tmp.cksum))
-
- sent_packets.remove(pkt_send)
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/lisp/lisp_check.py b/resources/traffic_scripts/lisp/lisp_check.py
index 9937de8077..ebd769fa9d 100755
--- a/resources/traffic_scripts/lisp/lisp_check.py
+++ b/resources/traffic_scripts/lisp/lisp_check.py
@@ -1,5 +1,6 @@
-#!/usr/bin/env python
-# Copyright (c) 2017 Cisco and/or its affiliates.
+#!/usr/bin/env python3
+
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -19,12 +20,13 @@ a LISP-encapsulated packet on the other interface and verifies received packet.
import sys
import ipaddress
+from scapy.all import bind_layers, Packet
+from scapy.fields import FlagsField, BitField, IntField
from scapy.layers.inet import ICMP, IP, UDP
from scapy.layers.inet6 import ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
-from scapy.all import bind_layers, Packet
-from scapy.fields import FlagsField, BitField, IntField
+from scapy.packet import Raw
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -32,26 +34,31 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
class LispHeader(Packet):
"""Scapy header for the LISP Layer."""
- name = "Lisp Header"
+
+ name = u"Lisp Header"
fields_desc = [
- FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "", "", ""]),
- BitField("nonce/map_version", 0, size=24),
- IntField("instance_id/locator_status_bits", 0)]
+ FlagsField(
+ u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""]
+ ),
+ BitField(u"nonce/map_version", 0, size=24),
+ IntField(u"instance_id/locator_status_bits", 0)]
class LispInnerIP(IP):
"""Scapy inner LISP layer for IPv4-in-IPv4."""
- name = "Lisp Inner Layer - IPv4"
+
+ name = u"Lisp Inner Layer - IPv4"
class LispInnerIPv6(IPv6):
"""Scapy inner LISP layer for IPv6-in-IPv6."""
- name = "Lisp Inner Layer - IPv6"
+
+ name = u"Lisp Inner Layer - IPv6"
def valid_ipv4(ip):
try:
- ipaddress.IPv4Address(unicode(ip))
+ ipaddress.IPv4Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -59,7 +66,7 @@ def valid_ipv4(ip):
def valid_ipv6(ip):
try:
- ipaddress.IPv6Address(unicode(ip))
+ ipaddress.IPv6Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -71,25 +78,28 @@ def main():
:raises RuntimeError: If the received packet is not correct."""
args = TrafficScriptArg(
- ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
- 'dut_if2_mac', 'src_rloc', 'dst_rloc'],
- ['ot_mode'])
-
- tx_src_mac = args.get_arg('tg_src_mac')
- tx_dst_mac = args.get_arg('dut_if1_mac')
- rx_dst_mac = args.get_arg('tg_dst_mac')
- rx_src_mac = args.get_arg('dut_if2_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- src_rloc = args.get_arg("src_rloc")
- dst_rloc = args.get_arg("dst_rloc")
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- ot_mode = args.get_arg('ot_mode')
+ [
+ u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+ u"dut_if2_mac", u"src_rloc", u"dst_rloc"
+ ],
+ [u"ot_mode"]
+ )
+
+ tx_src_mac = args.get_arg(u"tg_src_mac")
+ tx_dst_mac = args.get_arg(u"dut_if1_mac")
+ rx_dst_mac = args.get_arg(u"tg_dst_mac")
+ rx_src_mac = args.get_arg(u"dut_if2_mac")
+ src_ip = args.get_arg(u"src_ip")
+ dst_ip = args.get_arg(u"dst_ip")
+ src_rloc = args.get_arg(u"src_rloc")
+ dst_rloc = args.get_arg(u"dst_rloc")
+ tx_if = args.get_arg(u"tx_if")
+ rx_if = args.get_arg(u"rx_if")
+ ot_mode = args.get_arg(u"ot_mode")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
- sent_packets = []
+
pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
@@ -103,10 +113,13 @@ def main():
ip_format = IPv6
lisp_layer = LispInnerIPv6
else:
- raise ValueError("IP not in correct format")
+ raise ValueError(u"IP not in correct format")
bind_layers(UDP, LispHeader, dport=4341)
bind_layers(LispHeader, lisp_layer)
+
+ pkt_raw /= Raw()
+ sent_packets = list()
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
@@ -116,27 +129,23 @@ def main():
ether = rxq.recv(2)
if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
+ raise RuntimeError(u"ICMP echo Rx timeout")
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
- print("MAC addresses match.")
+ print(u"MAC addresses match.")
else:
- raise RuntimeError(
- "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+ raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
ip = ether.payload
- if ot_mode == '6to4':
+ if ot_mode == u"6to4":
if not isinstance(ip, IP):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
- elif ot_mode == '4to6':
- if not isinstance(ip, IP6):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
+ elif ot_mode == u"4to6":
+ if not isinstance(ip, IPv6):
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
elif not isinstance(ip, ip_format):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
lisp = ether.getlayer(lisp_layer)
if not lisp:
@@ -144,31 +153,35 @@ def main():
# Compare data from packets
if src_ip == lisp.src:
- print("Source IP matches source EID.")
+ print(u"Source IP matches source EID.")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
- .format(src_ip, lisp.src))
+ raise RuntimeError(
+ f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
+ )
if dst_ip == lisp.dst:
- print("Destination IP matches destination EID.")
+ print(u"Destination IP matches destination EID.")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
- .format(dst_ip, lisp.dst))
+ raise RuntimeError(
+ f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
+ )
if src_rloc == ip.src:
- print("Source RLOC matches configuration.")
+ print(u"Source RLOC matches configuration.")
else:
- raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
- .format(src_rloc, ip.src))
+ raise RuntimeError(
+ f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
+ )
if dst_rloc == ip.dst:
- print("Destination RLOC matches configuration.")
+ print(u"Destination RLOC matches configuration.")
else:
- raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
- .format(dst_rloc, ip.dst))
+ raise RuntimeError(
+ f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
+ )
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()
diff --git a/resources/traffic_scripts/lisp/lispgpe_check.py b/resources/traffic_scripts/lisp/lispgpe_check.py
index d4de8635d7..50857c4fe8 100755
--- a/resources/traffic_scripts/lisp/lispgpe_check.py
+++ b/resources/traffic_scripts/lisp/lispgpe_check.py
@@ -1,5 +1,6 @@
-#!/usr/bin/env python
-# Copyright (c) 2017 Cisco and/or its affiliates.
+#!/usr/bin/env python3
+
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -20,12 +21,13 @@ packet.
import sys
import ipaddress
+from scapy.all import bind_layers, Packet
+from scapy.fields import FlagsField, BitField, XBitField, IntField
from scapy.layers.inet import ICMP, IP, UDP
from scapy.layers.inet6 import ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
-from scapy.all import bind_layers, Packet
-from scapy.fields import FlagsField, BitField, XBitField, IntField
+from scapy.packet import Raw
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -33,13 +35,17 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
class LispGPEHeader(Packet):
"""Scapy header for the Lisp GPE Layer."""
+
name = "Lisp GPE Header"
fields_desc = [
- FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]),
- BitField("version", 0, size=2),
- BitField("reserved", 0, size=14),
- XBitField("next_protocol", 0, size=8),
- IntField("instance_id/locator_status_bits", 0)]
+ FlagsField(
+ u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
+ ),
+ BitField(u"version", 0, size=2),
+ BitField(u"reserved", 0, size=14),
+ XBitField(u"next_protocol", 0, size=8),
+ IntField(u"instance_id/locator_status_bits", 0)
+ ]
def guess_payload_class(self, payload):
protocol = {
@@ -53,17 +59,20 @@ class LispGPEHeader(Packet):
class LispGPEInnerIP(IP):
"""Scapy inner LISP GPE layer for IPv4-in-IPv4."""
- name = "Lisp GPE Inner Layer - IPv4"
+
+ name = u"Lisp GPE Inner Layer - IPv4"
class LispGPEInnerIPv6(IPv6):
"""Scapy inner LISP GPE layer for IPv6-in-IPv6."""
- name = "Lisp GPE Inner Layer - IPv6"
+
+ name = u"Lisp GPE Inner Layer - IPv6"
class LispGPEInnerEther(Ether):
"""Scapy inner LISP GPE layer for Lisp-L2."""
- name = "Lisp GPE Inner Layer - Ethernet"
+
+ name = u"Lisp GPE Inner Layer - Ethernet"
class LispGPEInnerNSH(Packet):
@@ -75,7 +84,7 @@ class LispGPEInnerNSH(Packet):
def valid_ipv4(ip):
try:
- ipaddress.IPv4Address(unicode(ip))
+ ipaddress.IPv4Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -83,7 +92,7 @@ def valid_ipv4(ip):
def valid_ipv6(ip):
try:
- ipaddress.IPv6Address(unicode(ip))
+ ipaddress.IPv6Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -95,25 +104,28 @@ def main():
:raises RuntimeError: If the received packet is not correct."""
args = TrafficScriptArg(
- ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
- 'dut_if2_mac', 'src_rloc', 'dst_rloc'],
- ['ot_mode'])
-
- tx_src_mac = args.get_arg('tg_src_mac')
- tx_dst_mac = args.get_arg('dut_if1_mac')
- rx_dst_mac = args.get_arg('tg_dst_mac')
- rx_src_mac = args.get_arg('dut_if2_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- src_rloc = args.get_arg("src_rloc")
- dst_rloc = args.get_arg("dst_rloc")
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- ot_mode = args.get_arg('ot_mode')
+ [
+ u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+ u"dut_if2_mac", u"src_rloc", u"dst_rloc"
+ ],
+ [u"ot_mode"]
+ )
+
+ tx_src_mac = args.get_arg(u"tg_src_mac")
+ tx_dst_mac = args.get_arg(u"dut_if1_mac")
+ rx_dst_mac = args.get_arg(u"tg_dst_mac")
+ rx_src_mac = args.get_arg(u"dut_if2_mac")
+ src_ip = args.get_arg(u"src_ip")
+ dst_ip = args.get_arg(u"dst_ip")
+ src_rloc = args.get_arg(u"src_rloc")
+ dst_rloc = args.get_arg(u"dst_rloc")
+ tx_if = args.get_arg(u"tx_if")
+ rx_if = args.get_arg(u"rx_if")
+ ot_mode = args.get_arg(u"ot_mode")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
- sent_packets = []
+
pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
@@ -125,10 +137,12 @@ def main():
pkt_raw /= ICMPv6EchoRequest()
ip_format = IPv6
else:
- raise ValueError("IP not in correct format")
+ raise ValueError(u"IP not in correct format")
bind_layers(UDP, LispGPEHeader, dport=4341)
+ pkt_raw /= Raw()
+ sent_packets = list()
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
@@ -138,59 +152,59 @@ def main():
ether = rxq.recv(2)
if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
+ raise RuntimeError(u"ICMP echo Rx timeout")
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
- print("MAC addresses match.")
+ print(u"MAC addresses match.")
else:
- raise RuntimeError(
- "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+ raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
ip = ether.payload
- if ot_mode == '6to4':
+ if ot_mode == u"6to4":
if not isinstance(ip, IP):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
- elif ot_mode == '4to6':
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
+ elif ot_mode == u"4to6":
if not isinstance(ip, IPv6):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
elif not isinstance(ip, ip_format):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
lisp = ether.getlayer(LispGPEHeader).underlayer
if not lisp:
- raise RuntimeError("Lisp layer not present or parsing failed.")
+ raise RuntimeError(u"Lisp layer not present or parsing failed.")
# Compare data from packets
if src_ip == lisp.src:
- print("Source IP matches source EID.")
+ print(u"Source IP matches source EID.")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
- .format(src_ip, lisp.src))
+ raise RuntimeError(
+ f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
+ )
if dst_ip == lisp.dst:
- print("Destination IP matches destination EID.")
+ print(u"Destination IP matches destination EID.")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
- .format(dst_ip, lisp.dst))
+ raise RuntimeError(
+ f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
+ )
if src_rloc == ip.src:
- print("Source RLOC matches configuration.")
+ print(u"Source RLOC matches configuration.")
else:
- raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
- .format(src_rloc, ip.src))
+ raise RuntimeError(
+ f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
+ )
if dst_rloc == ip.dst:
- print("Destination RLOC matches configuration.")
+ print(u"Destination RLOC matches configuration.")
else:
- raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
- .format(dst_rloc, ip.dst))
+ raise RuntimeError(
+ f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
+ )
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()
diff --git a/resources/traffic_scripts/policer.py b/resources/traffic_scripts/policer.py
index 62c397d496..c222c76152 100755
--- a/resources/traffic_scripts/policer.py
+++ b/resources/traffic_scripts/policer.py
@@ -1,6 +1,6 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
-# Copyright (c) 2016 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -18,14 +18,15 @@
import sys
import logging
+from ipaddress import ip_address
# pylint: disable=no-name-in-module
# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from scapy.all import Ether
+from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, TCP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
-from ipaddress import ip_address
+from scapy.packet import Raw
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
@@ -41,17 +42,14 @@ def check_ipv4(pkt_recv, dscp):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
- raise RuntimeError('Not an IPv4 packet received: {0}'.
- format(pkt_recv.__repr__()))
+ raise RuntimeError(f"Not an IPv4 packet received: {pkt_recv!r}")
rx_dscp = pkt_recv[IP].tos >> 2
if rx_dscp != dscp:
- raise RuntimeError('Invalid DSCP {0} should be {1}'.
- format(rx_dscp, dscp))
+ raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}")
if not pkt_recv.haslayer(TCP):
- raise RuntimeError('Not a TCP packet received: {0}'.
- format(pkt_recv.__repr__()))
+ raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}")
def check_ipv6(pkt_recv, dscp):
@@ -64,58 +62,48 @@ def check_ipv6(pkt_recv, dscp):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IPv6):
- raise RuntimeError('Not an IPv6 packet received: {0}'.
- format(pkt_recv.__repr__()))
+ raise RuntimeError(f"Not an IPv6 packet received: {pkt_recv!r}")
rx_dscp = pkt_recv[IPv6].tc >> 2
if rx_dscp != dscp:
- raise RuntimeError('Invalid DSCP {0} should be {1}'.
- format(rx_dscp, dscp))
+ raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}")
if not pkt_recv.haslayer(TCP):
- raise RuntimeError('Not a TCP packet received: {0}'.
- format(pkt_recv.__repr__()))
+ raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}")
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
def main():
"""Send and receive TCP packet."""
- args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', 'dscp'])
-
- rxq = RxQueue(args.get_arg('rx_if'))
- txq = TxQueue(args.get_arg('tx_if'))
+ args = TrafficScriptArg(
+ [u"src_mac", u"dst_mac", u"src_ip", u"dst_ip", u"dscp"]
+ )
- src_mac = args.get_arg('src_mac')
- dst_mac = args.get_arg('dst_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- dscp = int(args.get_arg('dscp'))
+ rxq = RxQueue(args.get_arg(u"rx_if"))
+ txq = TxQueue(args.get_arg(u"tx_if"))
- if 6 == ip_address(unicode(src_ip)).version:
- is_ipv4 = False
- else:
- is_ipv4 = True
-
- sent_packets = []
+ src_mac = args.get_arg(u"src_mac")
+ dst_mac = args.get_arg(u"dst_mac")
+ src_ip = args.get_arg(u"src_ip")
+ dst_ip = args.get_arg(u"dst_ip")
+ dscp = int(args.get_arg(u"dscp"))
- if is_ipv4:
- ip_pkt = (IP(src=src_ip, dst=dst_ip) /
- TCP())
- else:
- ip_pkt = (IPv6(src=src_ip, dst=dst_ip) /
- TCP())
+ ip_layer = IPv6 if ip_address(src_ip).version == 6 else IP
+ sent_packets = list()
pkt_send = (Ether(src=src_mac, dst=dst_mac) /
- ip_pkt)
+ ip_layer(src=src_ip, dst=dst_ip) /
+ TCP())
+ pkt_send /= Raw()
sent_packets.append(pkt_send)
txq.send(pkt_send)
while True:
pkt_recv = rxq.recv(2, sent_packets)
if pkt_recv is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ raise RuntimeError(u"ICMPv6 echo reply Rx timeout")
if pkt_recv.haslayer(ICMPv6ND_NS):
# read another packet in the queue if the current one is ICMPv6ND_NS
@@ -125,9 +113,9 @@ def main():
break
if pkt_recv is None:
- raise RuntimeError('Rx timeout')
+ raise RuntimeError(u"Rx timeout")
- if is_ipv4:
+ if ip_layer == IP:
check_ipv4(pkt_recv, dscp)
else:
check_ipv6(pkt_recv, dscp)
@@ -135,5 +123,5 @@ def main():
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()
diff --git a/resources/traffic_scripts/send_gre_check_gre_headers.py b/resources/traffic_scripts/send_gre_check_gre_headers.py
deleted file mode 100755
index 9e2e2abe06..0000000000
--- a/resources/traffic_scripts/send_gre_check_gre_headers.py
+++ /dev/null
@@ -1,124 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends a UDP encapsulated into GRE packet from one
-interface to the other, where GRE encapsulated packet is expected.
-"""
-
-import sys
-
-from robot.api import logger
-from scapy.all import Ether
-from scapy.layers.inet import IP_PROTOS
-from scapy.layers.inet import IP
-from scapy.layers.inet import UDP
-from scapy.layers.l2 import GRE
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def main():
- args = TrafficScriptArg(
- ['tx_dst_mac', 'tx_src_mac', 'tx_outer_dst_ip', 'tx_outer_src_ip',
- 'tx_inner_dst_ip', 'tx_inner_src_ip', 'rx_dst_mac', 'rx_src_mac',
- 'rx_outer_dst_ip', 'rx_outer_src_ip'])
-
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- tx_src_mac = args.get_arg('tx_src_mac')
- tx_outer_dst_ip = args.get_arg('tx_outer_dst_ip')
- tx_outer_src_ip = args.get_arg('tx_outer_src_ip')
- tx_inner_dst_ip = args.get_arg('tx_inner_dst_ip')
- tx_inner_src_ip = args.get_arg('tx_inner_src_ip')
- rx_dst_mac = args.get_arg('rx_dst_mac')
- rx_src_mac = args.get_arg('rx_src_mac')
- rx_outer_dst_ip = args.get_arg('rx_outer_dst_ip')
- rx_outer_src_ip = args.get_arg('rx_outer_src_ip')
- udp_src = 1234
- udp_dst = 2345
- udp_payload = 'udp_payload'
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
-
- tx_pkt_raw = Ether(dst=tx_dst_mac, src=tx_src_mac) / \
- IP(src=tx_outer_src_ip, dst=tx_outer_dst_ip) / \
- GRE() / \
- IP(src=tx_inner_src_ip, dst=tx_inner_dst_ip) / \
- UDP(dport=udp_dst, sport=udp_src) / \
- udp_payload
-
- sent_packets.append(tx_pkt_raw)
- txq.send(tx_pkt_raw)
- ether = rxq.recv(2, ignore=sent_packets)
-
- if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
-
- # Check RX headers
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Matching of received destination MAC unsuccessful.")
- logger.debug("Comparison of received destination MAC: OK.")
-
- if ether.src != rx_src_mac:
- raise RuntimeError("Matching of received source MAC unsuccessful.")
- logger.debug("Comparison of received source MAC: OK.")
-
- if ether['IP'].src != rx_outer_src_ip:
- raise RuntimeError("Matching of received outer source IP unsuccessful.")
- logger.debug("Comparison of received outer source IP: OK.")
-
- if ether['IP'].dst != rx_outer_dst_ip:
- raise RuntimeError(
- "Matching of received outer destination IP unsuccessful.")
- logger.debug("Comparison of received outer destination IP: OK.")
-
- if ether['IP'].proto != IP_PROTOS.gre:
- raise RuntimeError("IP protocol is not GRE.")
- logger.debug("Comparison of received GRE protocol: OK.")
-
- if ether['IP']['GRE']['IP'].src != tx_inner_src_ip:
- raise RuntimeError("Matching of received inner source IP unsuccessful.")
- logger.debug("Comparison of received inner source IP: OK.")
-
- if ether['IP']['GRE']['IP'].dst != tx_inner_dst_ip:
- raise RuntimeError(
- "Matching of received inner destination IP unsuccessful.")
- logger.debug("Comparison of received inner destination IP: OK.")
-
- # check udp
- udp = ether['IP']['GRE']['IP']['UDP']
- if udp.dport != udp_dst:
- raise RuntimeError("UDP dport error {} != {}.".
- format(udp.dport, udp_dst))
- print "UDP dport: OK."
-
- if udp.sport != udp_src:
- raise RuntimeError("UDP sport error {} != {}.".
- format(udp.sport, udp_src))
- print "UDP sport: OK."
-
- if str(udp.payload) != udp_payload:
- raise RuntimeError("UDP payload check unsuccessful {} != {}.".
- format(udp.payload, udp_payload))
- print "UDP payload: OK."
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_gre_check_icmp_headers.py b/resources/traffic_scripts/send_gre_check_icmp_headers.py
deleted file mode 100755
index 27eac5d786..0000000000
--- a/resources/traffic_scripts/send_gre_check_icmp_headers.py
+++ /dev/null
@@ -1,86 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends a GRE encapsulated ICMPv4 packet from one
-interface to the other, where is expected ICMPv4 without GRE header.
-"""
-
-import sys
-
-from robot.api import logger
-from scapy.all import Ether
-from scapy.layers.inet import IP_PROTOS
-from scapy.layers.inet import IP
-from scapy.layers.inet import ICMP
-from scapy.layers.l2 import GRE
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def main():
- args = TrafficScriptArg(
- ['tx_dst_mac', 'rx_dst_mac',
- 'inner_src_ip', 'inner_dst_ip',
- 'outer_src_ip', 'outer_dst_ip'])
-
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- rx_dst_mac = args.get_arg('rx_dst_mac')
- inner_src_ip = args.get_arg('inner_src_ip')
- inner_dst_ip = args.get_arg('inner_dst_ip')
- outer_src_ip = args.get_arg('outer_src_ip')
- outer_dst_ip = args.get_arg('outer_dst_ip')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
-
- tx_pkt_raw = Ether(dst=tx_dst_mac) / \
- IP(src=outer_src_ip, dst=outer_dst_ip) / \
- GRE() / \
- IP(src=inner_src_ip, dst=inner_dst_ip) / \
- ICMP()
-
- sent_packets.append(tx_pkt_raw)
- txq.send(tx_pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
-
- # Check RX headers
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Matching of received destination MAC unsuccessful.")
- logger.debug("Comparison of received destination MAC: OK.")
-
- if ether['IP'].src != inner_src_ip:
- raise RuntimeError("Matching of received inner source IP unsuccessful.")
- logger.debug("Comparison of received outer source IP: OK.")
-
- if ether['IP'].dst != inner_dst_ip:
- raise RuntimeError(
- "Matching of received inner destination IP unsuccessful.")
- logger.debug("Comparison of received outer destination IP: OK.")
-
- if ether['IP'].proto != IP_PROTOS.icmp:
- raise RuntimeError("IP protocol is other than ICMP.")
- logger.debug("Comparison of received ICMP protocol: OK.")
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_icmp_check_arp.py b/resources/traffic_scripts/send_icmp_check_arp.py
deleted file mode 100755
index 49d4710ba4..0000000000
--- a/resources/traffic_scripts/send_icmp_check_arp.py
+++ /dev/null
@@ -1,135 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an IP ICMP packet
-from one interface and expects ARP on the other one.
-"""
-
-import sys
-import ipaddress
-
-from scapy.all import Ether
-from scapy.layers.inet import ICMP, IP
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def valid_ipv4(ip):
- """Check if IP address has the correct IPv4 address format.
-
- :param ip: IP address.
- :type ip: str
- :return: True in case of correct IPv4 address format,
- otherwise return False.
- :rtype: bool
- """
- try:
- ipaddress.IPv4Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def main():
- """Send IP ICMP packet from one traffic generator interface and expects
- ARP on the other."""
- args = TrafficScriptArg(
- ['tx_dst_mac', 'rx_src_mac', 'tx_src_ip', 'tx_dst_ip', 'rx_arp_src_ip',
- 'rx_arp_dst_ip'])
-
- tx_dst_mac = args.get_arg('tx_dst_mac')
- rx_src_mac = args.get_arg('rx_src_mac')
- src_ip = args.get_arg('tx_src_ip')
- dst_ip = args.get_arg('tx_dst_ip')
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- rx_dst_mac = 'ff:ff:ff:ff:ff:ff'
- rx_arp_src_ip = args.get_arg('rx_arp_src_ip')
- rx_arp_dst_ip = args.get_arg('rx_arp_dst_ip')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- # Create empty IP ICMP packet
- if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
- pkt_raw = Ether(dst=tx_dst_mac) / IP(src=src_ip, dst=dst_ip) / ICMP()
-
- # Send created packet on one interface and receive on the other
- txq.send(pkt_raw)
-
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("Ethernet frame Rx timeout")
-
- if ether.dst == rx_dst_mac:
- print("Ethernet destination address matched.")
- else:
- raise RuntimeError(
- "Matching ethernet destination address unsuccessful: {0} != {1}".
- format(ether.dst, rx_dst_mac))
-
- if ether.src == rx_src_mac:
- print("Ethernet source address matched.")
- else:
- raise RuntimeError(
- "Matching ethernet source address unsuccessful: {0} != {1}"
- .format(ether.src, rx_src_mac))
-
- # ARP check
- if ether['ARP'] is not None:
- print("ARP packet received.")
- else:
- raise RuntimeError("Not an ARP packet received {0}"
- .format(ether.__repr__()))
-
- # Compare data from packets
- if ether['ARP'].op == 1: # 1 - who-has request
- print("ARP request matched.")
- else:
- raise RuntimeError("Matching ARP request unsuccessful: {0} != {1}"
- .format(ether['ARP'].op, 1))
-
- if ether['ARP'].hwsrc == rx_src_mac:
- print("Source MAC matched.")
- else:
- raise RuntimeError("Matching Source MAC unsuccessful: {0} != {1}"
- .format(ether['ARP'].hwsrc, rx_src_mac))
-
- if ether['ARP'].hwdst == "00:00:00:00:00:00":
- print("Destination MAC matched.")
- else:
- raise RuntimeError("Matching Destination MAC unsuccessful: {0} != {1}"
- .format(ether['ARP'].hwdst, "00:00:00:00:00:00"))
-
- if ether['ARP'].psrc == rx_arp_src_ip:
- print("Source ARP IP address matched.")
- else:
- raise RuntimeError(
- "Matching Source ARP IP address unsuccessful: {0} != {1}"
- .format(ether['ARP'].psrc, rx_arp_src_ip))
-
- if ether['ARP'].pdst == rx_arp_dst_ip:
- print("Destination ARP IP address matched.")
- else:
- raise RuntimeError(
- "Matching Destination ARP IP address unsuccessful: {0} != {1}"
- .format(ether['ARP'].pdst, rx_arp_dst_ip))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_icmp_check_gre_headers.py b/resources/traffic_scripts/send_icmp_check_gre_headers.py
deleted file mode 100755
index ccea9f5d9b..0000000000
--- a/resources/traffic_scripts/send_icmp_check_gre_headers.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an ICMPv4 packet from one interface to the other,
-where GRE encapsulated packet is expected.
-"""
-
-import sys
-
-from robot.api import logger
-from scapy.all import Ether
-from scapy.layers.inet import IP_PROTOS
-from scapy.layers.inet import IP
-from scapy.layers.inet import ICMP
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def main():
- args = TrafficScriptArg(
- ['tx_dst_mac', 'rx_dst_mac',
- 'inner_src_ip', 'inner_dst_ip',
- 'outer_src_ip', 'outer_dst_ip'])
-
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- rx_dst_mac = args.get_arg('rx_dst_mac')
- inner_src_ip = args.get_arg('inner_src_ip')
- inner_dst_ip = args.get_arg('inner_dst_ip')
- outer_src_ip = args.get_arg('outer_src_ip')
- outer_dst_ip = args.get_arg('outer_dst_ip')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
-
- tx_pkt_raw = Ether(dst=tx_dst_mac) / \
- IP(src=inner_src_ip, dst=inner_dst_ip) / \
- ICMP()
-
- sent_packets.append(tx_pkt_raw)
- txq.send(tx_pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
-
- # Check RX headers
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Matching of received destination MAC unsuccessful.")
- logger.debug("Comparison of received destination MAC: OK.")
-
- if ether['IP'].src != outer_src_ip:
- raise RuntimeError("Matching of received outer source IP unsuccessful.")
- logger.debug("Comparison of received outer source IP: OK.")
-
- if ether['IP'].dst != outer_dst_ip:
- raise RuntimeError(
- "Matching of received outer destination IP unsuccessful.")
- logger.debug("Comparison of received outer destination IP: OK.")
-
- if ether['IP'].proto != IP_PROTOS.gre:
- raise RuntimeError("IP protocol is no GRE.")
- logger.debug("Comparison of received GRE protocol: OK.")
-
- if ether['IP']['GRE']['IP'].src != inner_src_ip:
- raise RuntimeError("Matching of received inner source IP unsuccessful.")
- logger.debug("Comparison of received inner source IP: OK.")
-
- if ether['IP']['GRE']['IP'].dst != inner_dst_ip:
- raise RuntimeError(
- "Matching of received inner destination IP unsuccessful.")
- logger.debug("Comparison of received inner destination IP: OK.")
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_icmp_check_headers.py b/resources/traffic_scripts/send_icmp_check_headers.py
deleted file mode 100755
index 2193164b61..0000000000
--- a/resources/traffic_scripts/send_icmp_check_headers.py
+++ /dev/null
@@ -1,153 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface
-to the other. Source and destination IP addresses and source and destination
-MAC addresses are checked in received packet.
-"""
-
-import sys
-
-import ipaddress
-from robot.api import logger
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
-from scapy.layers.l2 import Ether, Dot1Q
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def valid_ipv4(ip):
- try:
- ipaddress.IPv4Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def valid_ipv6(ip):
- try:
- ipaddress.IPv6Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def main():
- """Send IP ICMP packet from one traffic generator interface to the other."""
- args = TrafficScriptArg(
- ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
- 'dut_if2_mac'],
- ['encaps_tx', 'vlan_tx', 'vlan_outer_tx',
- 'encaps_rx', 'vlan_rx', 'vlan_outer_rx'])
-
- tx_src_mac = args.get_arg('tg_src_mac')
- tx_dst_mac = args.get_arg('dut_if1_mac')
- rx_dst_mac = args.get_arg('tg_dst_mac')
- rx_src_mac = args.get_arg('dut_if2_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
-
- encaps_tx = args.get_arg('encaps_tx')
- vlan_tx = args.get_arg('vlan_tx')
- vlan_outer_tx = args.get_arg('vlan_outer_tx')
- encaps_rx = args.get_arg('encaps_rx')
- vlan_rx = args.get_arg('vlan_rx')
- vlan_outer_rx = args.get_arg('vlan_outer_rx')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
- ip_format = ''
- pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
- if encaps_tx == 'Dot1q':
- pkt_raw /= Dot1Q(vlan=int(vlan_tx))
- elif encaps_tx == 'Dot1ad':
- pkt_raw.type = 0x88a8
- pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
- pkt_raw /= Dot1Q(vlan=vlan_tx)
- if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
- pkt_raw /= IP(src=src_ip, dst=dst_ip)
- pkt_raw /= ICMP()
- ip_format = IP
- elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
- pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
- pkt_raw /= ICMPv6EchoRequest()
- ip_format = IPv6
- else:
- raise ValueError("IP not in correct format")
-
- sent_packets.append(pkt_raw)
- txq.send(pkt_raw)
-
- while True:
- if tx_if == rx_if:
- ether = rxq.recv(2, ignore=sent_packets)
- else:
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
- logger.trace("MAC matched")
- else:
- raise RuntimeError("Matching packet unsuccessful: {0}".
- format(ether.__repr__()))
-
- if encaps_rx == 'Dot1q':
- if ether[Dot1Q].vlan == int(vlan_rx):
- logger.trace("VLAN matched")
- else:
- raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
- 'received, {}-expected):\n{}'.
- format(ether[Dot1Q].vlan, vlan_rx,
- ether.__repr__()))
- ip = ether[Dot1Q].payload
- elif encaps_rx == 'Dot1ad':
- raise NotImplementedError()
- else:
- ip = ether.payload
-
- if not isinstance(ip, ip_format):
- raise RuntimeError("Not an IP packet received {0}".
- format(ip.__repr__()))
-
- # Compare data from packets
- if src_ip == ip.src:
- logger.trace("Src IP matched")
- else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
- format(src_ip, ip.src))
-
- if dst_ip == ip.dst:
- logger.trace("Dst IP matched")
- else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
- format(dst_ip, ip.dst))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_icmp_check_multipath.py b/resources/traffic_scripts/send_icmp_check_multipath.py
deleted file mode 100755
index 5da3b7b096..0000000000
--- a/resources/traffic_scripts/send_icmp_check_multipath.py
+++ /dev/null
@@ -1,142 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an IP ICMPv4/ICMPv6 packets traffic
-and check if it is divided into two paths."""
-
-import sys
-import ipaddress
-
-from scapy.all import Ether
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def valid_ipv4(ip):
- try:
- ipaddress.IPv4Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def valid_ipv6(ip):
- try:
- ipaddress.IPv6Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def main():
- """Send 100 IP ICMP packets traffic and check if it is divided into
- two paths."""
- args = TrafficScriptArg(
- ['src_ip', 'dst_ip', 'tg_if1_mac', 'dut_if1_mac', 'dut_if2_mac',
- 'path_1_mac', 'path_2_mac'])
-
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- tg_if1_mac = args.get_arg('tg_if1_mac')
- dut_if1_mac = args.get_arg('dut_if1_mac')
- dut_if2_mac = args.get_arg('dut_if2_mac')
- path_1_mac = args.get_arg('path_1_mac')
- path_2_mac = args.get_arg('path_2_mac')
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- path_1_counter = 0
- path_2_counter = 0
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
- ip_format = ''
- pkt_raw = ''
- separator = ''
-
- if valid_ipv4(src_ip):
- separator = '.'
- elif valid_ipv6(src_ip):
- separator = ':'
- else:
- raise ValueError("Source address not in correct format")
-
- src_ip_base = (src_ip.rsplit(separator, 1))[0] + separator
-
- for i in range(1, 101):
- if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
- pkt_raw = (Ether(src=tg_if1_mac, dst=dut_if1_mac) /
- IP(src=src_ip_base+str(i), dst=dst_ip) /
- ICMP())
- ip_format = 'IP'
- elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
- pkt_raw = (Ether(src=tg_if1_mac, dst=dut_if1_mac) /
- IPv6(src=src_ip_base+str(i), dst=dst_ip) /
- ICMPv6EchoRequest())
- ip_format = 'IPv6'
- else:
- raise ValueError("IP not in correct format")
-
- sent_packets.append(pkt_raw)
- txq.send(pkt_raw)
-
- while True:
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue in case of ICMPv6ND_NS packet
- continue
- else:
- # otherwise process the current packet
- break
-
- if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
- if not ether.haslayer(ip_format):
- raise RuntimeError("Not an IP packet received {0}".
- format(ether.__repr__()))
-
- if ether[Ether].src != dut_if2_mac:
- raise RuntimeError("Source MAC address error")
-
- if ether[Ether].dst == path_1_mac:
- path_1_counter += 1
- elif ether[Ether].dst == path_2_mac:
- path_2_counter += 1
- else:
- raise RuntimeError("Destination MAC address error")
-
- if (path_1_counter + path_2_counter) != 100:
- raise RuntimeError("Packet loss: recevied only {} packets of 100 ".
- format(path_1_counter + path_2_counter))
-
- if path_1_counter == 0:
- raise RuntimeError("Path 1 error!")
-
- if path_2_counter == 0:
- raise RuntimeError("Path 2 error!")
-
- print "Path_1 counter: {}".format(path_1_counter)
- print "Path_2 counter: {}".format(path_2_counter)
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_icmp_type_code.py b/resources/traffic_scripts/send_icmp_type_code.py
deleted file mode 100755
index 2997f91264..0000000000
--- a/resources/traffic_scripts/send_icmp_type_code.py
+++ /dev/null
@@ -1,132 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2017 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to
-the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
-"""
-
-import sys
-import ipaddress
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def valid_ipv4(ip):
- """Check if IP address has the correct IPv4 address format.
-
- :param ip: IP address.
- :type ip: str
- :returns: True in case of correct IPv4 address format,
- otherwise return false.
- :rtype: bool
- """
- try:
- ipaddress.IPv4Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def valid_ipv6(ip):
- """Check if IP address has the correct IPv6 address format.
-
- :param ip: IP address.
- :type ip: str
- :returns: True in case of correct IPv6 address format,
- otherwise return false.
- :rtype: bool
- """
- try:
- ipaddress.IPv6Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def main():
- """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
- the other one."""
-
- args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'icmp_type', 'icmp_code'])
-
- src_mac = args.get_arg('src_mac')
- dst_mac = args.get_arg('dst_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
-
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
-
- icmp_type = int(args.get_arg('icmp_type'))
- icmp_code = int(args.get_arg('icmp_code'))
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
-
- # Create empty ip ICMP packet and add padding before sending
- if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
- pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
- IP(src=src_ip, dst=dst_ip) /
- ICMP(code=icmp_code, type=icmp_type))
- ip_format = IP
- icmp_format = ICMP
- elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
- pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest(code=icmp_code, type=icmp_type))
- ip_format = IPv6
- icmp_format = 'ICMPv6'
- else:
- raise ValueError("IP(s) not in correct format")
-
- # Send created packet on one interface and receive on the other
- sent_packets.append(pkt_raw)
- txq.send(pkt_raw)
-
- while True:
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- # Check whether received packet contains layers IP and ICMP
- if not ether.haslayer(ip_format):
- raise RuntimeError('Not an IP packet received {0}'.
- format(ether.__repr__()))
-
- # Cannot use haslayer for ICMPv6, every type of ICMPv6 is a separate layer
- # Next header value of 58 means the next header is ICMPv6
- if not ether.haslayer(icmp_format) and ether[ip_format].nh != 58:
- raise RuntimeError('Not an ICMP packet received {0}'.
- format(ether.__repr__()))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_icmp_wait_for_reply.py b/resources/traffic_scripts/send_icmp_wait_for_reply.py
index 7677152801..2c79ae7136 100755
--- a/resources/traffic_scripts/send_icmp_wait_for_reply.py
+++ b/resources/traffic_scripts/send_icmp_wait_for_reply.py
@@ -1,4 +1,5 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
+
# Copyright (c) 2016 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,56 +18,16 @@
import sys
import ipaddress
-from scapy.all import Ether
from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
+ ICMPv6ND_NS
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-def is_icmp_reply(pkt, ipformat):
- """Return True if pkt is echo reply, else return False. If exception occurs
- return False.
-
- :param pkt: Packet.
- :param ipformat: Dictionary of names to distinguish IPv4 and IPv6.
- :type pkt: dict
- :type ipformat: dict
- :rtype: bool
- """
- # pylint: disable=bare-except
- try:
- if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \
- ipformat['Type']:
- return True
- else:
- return False
- except: # pylint: disable=bare-except
- return False
-
-
-def address_check(request, reply, ipformat):
- """Compare request packet source address with reply destination address
- and vice versa. If exception occurs return False.
-
- :param request: Sent packet containing request.
- :param reply: Received packet containing reply.
- :param ipformat: Dictionary of names to distinguish IPv4 and IPv6.
- :type request: dict
- :type reply: dict
- :type ipformat: dict
- :rtype: bool
- """
- # pylint: disable=bare-except
- try:
- r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst
- r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src
- return r_src and r_dst
- except: # pylint: disable=bare-except
- return False
-
-
def valid_ipv4(ip):
"""Check if IP address has the correct IPv4 address format.
@@ -77,7 +38,7 @@ def valid_ipv4(ip):
:rtype: bool
"""
try:
- ipaddress.IPv4Address(unicode(ip))
+ ipaddress.IPv4Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -93,7 +54,7 @@ def valid_ipv6(ip):
:rtype: bool
"""
try:
- ipaddress.IPv6Address(unicode(ip))
+ ipaddress.IPv6Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -102,16 +63,17 @@ def valid_ipv6(ip):
def main():
"""Send ICMP echo request and wait for ICMP echo reply. It ignores all other
packets."""
- args = TrafficScriptArg(['dst_mac', 'src_mac', 'dst_ip', 'src_ip',
- 'timeout'])
-
- dst_mac = args.get_arg('dst_mac')
- src_mac = args.get_arg('src_mac')
- dst_ip = args.get_arg('dst_ip')
- src_ip = args.get_arg('src_ip')
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ args = TrafficScriptArg(
+ [u"dst_mac", u"src_mac", u"dst_ip", u"src_ip", u"timeout"]
+ )
+
+ dst_mac = args.get_arg(u"dst_mac")
+ src_mac = args.get_arg(u"src_mac")
+ dst_ip = args.get_arg(u"dst_ip")
+ src_ip = args.get_arg(u"src_ip")
+ tx_if = args.get_arg(u"tx_if")
+ rx_if = args.get_arg(u"rx_if")
+ timeout = int(args.get_arg(u"timeout"))
wait_step = 1
rxq = RxQueue(rx_if)
@@ -120,21 +82,26 @@ def main():
# Create empty ip ICMP packet
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
- icmp_request = (Ether(src=src_mac, dst=dst_mac) /
- IP(src=src_ip, dst=dst_ip) /
- ICMP())
- ip_format = {'IPType': 'IP', 'ICMP_req': 'ICMP',
- 'ICMP_rep': 'ICMP', 'Type': 0}
+ ip_layer = IP
+ icmp_req = ICMP
+ icmp_resp = ICMP
+ icmp_type = 0 # echo-reply
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
- icmp_request = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest())
- ip_format = {'IPType': 'IPv6', 'ICMP_req': 'ICMPv6 Echo Request',
- 'ICMP_rep': 'ICMPv6 Echo Reply', 'Type': 129}
+ ip_layer = IP
+ icmp_req = ICMPv6EchoRequest
+ icmp_resp = ICMPv6EchoReply
+ icmp_type = 0 # Echo Reply
else:
- raise ValueError("IP not in correct format")
+ raise ValueError(u"IP not in correct format")
+
+ icmp_request = (
+ Ether(src=src_mac, dst=dst_mac) /
+ ip_layer(src=src_ip, dst=dst_ip) /
+ icmp_req()
+ )
# Send created packet on the interface
+ icmp_request /= Raw()
sent_packets.append(icmp_request)
txq.send(icmp_request)
@@ -144,7 +111,7 @@ def main():
if icmp_reply is None:
timeout -= wait_step
if timeout < 0:
- raise RuntimeError("ICMP echo Rx timeout")
+ raise RuntimeError(u"ICMP echo Rx timeout")
elif icmp_reply.haslayer(ICMPv6ND_NS):
# read another packet in the queue in case of ICMPv6ND_NS packet
@@ -153,16 +120,17 @@ def main():
# otherwise process the current packet
break
- if is_icmp_reply(icmp_reply, ip_format):
- if address_check(icmp_request, icmp_reply, ip_format):
+ if icmp_reply[ip_layer][icmp_resp].type == icmp_type:
+ if icmp_reply[ip_layer].src == dst_ip and \
+ icmp_reply[ip_layer].dst == src_ip:
break
else:
- raise RuntimeError("Max packet count limit reached")
+ raise RuntimeError(u"Max packet count limit reached")
- print "ICMP echo reply received."
+ print(u"ICMP echo reply received.")
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()
diff --git a/resources/traffic_scripts/send_ip_check_headers.py b/resources/traffic_scripts/send_ip_check_headers.py
index 7c4f2bd002..f5a55553cf 100755
--- a/resources/traffic_scripts/send_ip_check_headers.py
+++ b/resources/traffic_scripts/send_ip_check_headers.py
@@ -1,4 +1,5 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
+
# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,10 +21,12 @@ MAC addresses are checked in received packet.
import sys
import ipaddress
+
from robot.api import logger
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.layers.l2 import Ether, Dot1Q
+from scapy.packet import Raw
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@ -31,7 +34,7 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def valid_ipv4(ip):
try:
- ipaddress.IPv4Address(unicode(ip))
+ ipaddress.IPv4Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -39,7 +42,7 @@ def valid_ipv4(ip):
def valid_ipv6(ip):
try:
- ipaddress.IPv6Address(unicode(ip))
+ ipaddress.IPv6Address(ip)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -48,38 +51,45 @@ def valid_ipv6(ip):
def main():
"""Send IP/IPv6 packet from one traffic generator interface to the other."""
args = TrafficScriptArg(
- ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
- 'dut_if2_mac'],
- ['encaps_tx', 'vlan_tx', 'vlan_outer_tx',
- 'encaps_rx', 'vlan_rx', 'vlan_outer_rx'])
-
- tx_src_mac = args.get_arg('tg_src_mac')
- tx_dst_mac = args.get_arg('dut_if1_mac')
- rx_dst_mac = args.get_arg('tg_dst_mac')
- rx_src_mac = args.get_arg('dut_if2_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
-
- encaps_tx = args.get_arg('encaps_tx')
- vlan_tx = args.get_arg('vlan_tx')
- vlan_outer_tx = args.get_arg('vlan_outer_tx')
- encaps_rx = args.get_arg('encaps_rx')
- vlan_rx = args.get_arg('vlan_rx')
- vlan_outer_rx = args.get_arg('vlan_outer_rx')
+ [
+ u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
+ u"dut_if2_mac"
+ ],
+ [
+ u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx",
+ u"vlan_rx", u"vlan_outer_rx"
+ ]
+ )
+
+ tx_src_mac = args.get_arg(u"tg_src_mac")
+ tx_dst_mac = args.get_arg(u"dut_if1_mac")
+ rx_dst_mac = args.get_arg(u"tg_dst_mac")
+ rx_src_mac = args.get_arg(u"dut_if2_mac")
+ src_ip = args.get_arg(u"src_ip")
+ dst_ip = args.get_arg(u"dst_ip")
+ tx_if = args.get_arg(u"tx_if")
+ rx_if = args.get_arg(u"rx_if")
+
+ encaps_tx = args.get_arg(u"encaps_tx")
+ vlan_tx = args.get_arg(u"vlan_tx")
+ vlan_outer_tx = args.get_arg(u"vlan_outer_tx")
+ encaps_rx = args.get_arg(u"encaps_rx")
+ vlan_rx = args.get_arg(u"vlan_rx")
+ vlan_outer_rx = args.get_arg(u"vlan_outer_rx")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
- sent_packets = []
- ip_format = ''
+
+ sent_packets =list()
pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
- if encaps_tx == 'Dot1q':
+
+ if encaps_tx == u"Dot1q":
pkt_raw /= Dot1Q(vlan=int(vlan_tx))
- elif encaps_tx == 'Dot1ad':
+ elif encaps_tx == u"Dot1ad":
pkt_raw.type = 0x88a8
pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
pkt_raw /= Dot1Q(vlan=vlan_tx)
+
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
ip_format = IP
@@ -87,8 +97,9 @@ def main():
pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
ip_format = IPv6
else:
- raise ValueError("IP not in correct format")
+ raise ValueError(u"IP not in correct format")
+ pkt_raw /= Raw()
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
@@ -99,7 +110,7 @@ def main():
ether = rxq.recv(2)
if ether is None:
- raise RuntimeError('IP packet Rx timeout')
+ raise RuntimeError(u"IP packet Rx timeout")
if ether.haslayer(ICMPv6ND_NS):
# read another packet in the queue if the current one is ICMPv6ND_NS
@@ -109,44 +120,45 @@ def main():
break
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
- logger.trace("MAC matched")
+ logger.trace(u"MAC matched")
else:
- raise RuntimeError("Matching packet unsuccessful: {0}".
- format(ether.__repr__()))
+ raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
- if encaps_rx == 'Dot1q':
+ if encaps_rx == u"Dot1q":
if ether[Dot1Q].vlan == int(vlan_rx):
- logger.trace("VLAN matched")
+ logger.trace(u"VLAN matched")
else:
- raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
- 'received, {}-expected):\n{}'.
- format(ether[Dot1Q].vlan, vlan_rx,
- ether.__repr__()))
+ raise RuntimeError(
+ f"Ethernet frame with wrong VLAN tag "
+ f"({ether[Dot1Q].vlan}-received, "
+ f"{vlan_rx}-expected):\n{ether!r}"
+ )
ip = ether[Dot1Q].payload
- elif encaps_rx == 'Dot1ad':
+ elif encaps_rx == u"Dot1ad":
raise NotImplementedError()
else:
ip = ether.payload
if not isinstance(ip, ip_format):
- raise RuntimeError("Not an IP packet received {0}".
- format(ip.__repr__()))
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
# Compare data from packets
if src_ip == ip.src:
- logger.trace("Src IP matched")
+ logger.trace(u"Src IP matched")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
- format(src_ip, ip.src))
+ raise RuntimeError(
+ f"Matching Src IP unsuccessful: {src_ip} != {ip.src}"
+ )
if dst_ip == ip.dst:
- logger.trace("Dst IP matched")
+ logger.trace(u"Dst IP matched")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
- format(dst_ip, ip.dst))
+ raise RuntimeError(
+ f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}"
+ )
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()
diff --git a/resources/traffic_scripts/send_ip_icmp.py b/resources/traffic_scripts/send_ip_icmp.py
deleted file mode 100755
index 58f2e1e4d8..0000000000
--- a/resources/traffic_scripts/send_ip_icmp.py
+++ /dev/null
@@ -1,186 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to
-the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
-"""
-
-import sys
-import ipaddress
-
-from scapy.layers.l2 import Ether, Dot1Q
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def valid_ipv4(ip):
- """Check if IP address has the correct IPv4 address format.
-
- :param ip: IP address.
- :type ip: str
- :return: True in case of correct IPv4 address format,
- otherwise return false.
- :rtype: bool
- """
- try:
- ipaddress.IPv4Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def valid_ipv6(ip):
- """Check if IP address has the correct IPv6 address format.
-
- :param ip: IP address.
- :type ip: str
- :return: True in case of correct IPv6 address format,
- otherwise return false.
- :rtype: bool
- """
- try:
- ipaddress.IPv6Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def main():
- """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
- the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
- """
- args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip'],
- ['encaps', 'vlan1', 'vlan2', 'encaps_rx',
- 'vlan1_rx', 'vlan2_rx'])
-
- src_mac = args.get_arg('src_mac')
- dst_mac = args.get_arg('dst_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
-
- encaps = args.get_arg('encaps')
- vlan1 = args.get_arg('vlan1')
- vlan2 = args.get_arg('vlan2')
- encaps_rx = args.get_arg('encaps_rx')
- vlan1_rx = args.get_arg('vlan1_rx')
- vlan2_rx = args.get_arg('vlan2_rx')
-
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- sent_packets = []
- ip_format = ''
- icmp_format = ''
- # Create empty ip ICMP packet and add padding before sending
- if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
- if encaps == 'Dot1q':
- pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
- Dot1Q(vlan=int(vlan1)) /
- IP(src=src_ip, dst=dst_ip) /
- ICMP())
- elif encaps == 'Dot1ad':
- pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) /
- Dot1Q(vlan=int(vlan1), type=0x8100) /
- Dot1Q(vlan=int(vlan2)) /
- IP(src=src_ip, dst=dst_ip) /
- ICMP())
- else:
- pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
- IP(src=src_ip, dst=dst_ip) /
- ICMP())
- ip_format = IP
- icmp_format = ICMP
- elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
- if encaps == 'Dot1q':
- pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
- Dot1Q(vlan=int(vlan1)) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest())
- elif encaps == 'Dot1ad':
- pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) /
- Dot1Q(vlan=int(vlan1), type=0x8100) /
- Dot1Q(vlan=int(vlan2)) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest())
- else:
- pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest())
- ip_format = IPv6
- icmp_format = ICMPv6EchoRequest
- else:
- raise ValueError("IP(s) not in correct format")
-
- # Send created packet on one interface and receive on the other
- sent_packets.append(pkt_raw)
- txq.send(pkt_raw)
-
- # Receive ICMP / ICMPv6 echo reply
- while True:
- ether = rxq.recv(2,)
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- # Check whether received packet contains layers IP/IPv6 and
- # ICMP/ICMPv6EchoRequest
- if encaps_rx:
- if encaps_rx == 'Dot1q':
- if not vlan1_rx:
- vlan1_rx = vlan1
- if not ether.haslayer(Dot1Q):
- raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'.
- format(ether.__repr__()))
- elif ether[Dot1Q].vlan != int(vlan1_rx):
- raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) '
- 'received ({} expected):\n{}'.
- format(ether[Dot1Q].vlan, vlan1_rx,
- ether.__repr__()))
- elif encaps_rx == 'Dot1ad':
- if not vlan1_rx:
- vlan1_rx = vlan1
- if not vlan2_rx:
- vlan2_rx = vlan2
- # TODO
- raise RuntimeError('Encapsulation {0} not implemented yet.'.
- format(encaps_rx))
- else:
- raise RuntimeError('Unsupported encapsulation expected: {0}'.
- format(encaps_rx))
-
- if not ether.haslayer(ip_format):
- raise RuntimeError('Not an IP/IPv6 packet received:\n{0}'.
- format(ether.__repr__()))
-
- if not ether.haslayer(icmp_format):
- raise RuntimeError('Not an ICMP/ICMPv6EchoRequest packet received:\n'
- '{0}'.format(ether.__repr__()))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_ipv4_icmp_check_lw_4o6.py b/resources/traffic_scripts/send_ipv4_icmp_check_lw_4o6.py
deleted file mode 100755
index 9e225428df..0000000000
--- a/resources/traffic_scripts/send_ipv4_icmp_check_lw_4o6.py
+++ /dev/null
@@ -1,139 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an IPv4 ICMP packet with ID field and checks if
-IPv4 is correctly encapsulated into IPv6 packet. Doing for ICMP echo request
-and ICMP echo response packet."""
-
-import sys
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, ICMP, icmptypes
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def _is_ipv4_in_ipv6(pkt):
- """If IPv6 next header type in the given pkt is IPv4, return True,
- else return False. False is returned also if exception occurs."""
- ipv6_type = int('0x86dd', 16) # IPv6
- try:
- if pkt.type == ipv6_type:
- if pkt.payload.nh == 4:
- return True
- except: # pylint: disable=bare-except
- return False
- return False
-
-
-def main(): # pylint: disable=too-many-statements, too-many-locals
- """Main function of the script file."""
- args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4',
- 'tx_icmp_id', 'rx_dst_mac', 'rx_src_mac',
- 'src_ipv6', 'dst_ipv6'])
- rx_if = args.get_arg('rx_if')
- tx_if = args.get_arg('tx_if')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- tx_src_ipv4 = args.get_arg('tx_src_ipv4')
- tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
- tx_icmp_id = int(args.get_arg('tx_icmp_id'))
- rx_dst_mac = args.get_arg('rx_dst_mac')
- rx_src_mac = args.get_arg('rx_src_mac')
- rx_src_ipv6 = args.get_arg('src_ipv6')
- rx_dst_ipv6 = args.get_arg('dst_ipv6')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
-
- for icmp_type in ('echo-request', 'echo-reply'):
- print '\nChecking ICMP type: {}'.format(icmp_type)
-
- # Create ICMP request
- tx_pkt = (Ether(dst=tx_dst_mac) /
- IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
- ICMP(type=icmp_type, id=tx_icmp_id))
-
- txq.send(tx_pkt)
- sent_packets.append(tx_pkt)
-
- for _ in range(5):
- pkt = rxq.recv(2)
- if _is_ipv4_in_ipv6(pkt):
- ether = pkt
- break
- else:
- raise RuntimeError("IPv4 in IPv6 Rx error.")
-
- # check ethernet
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Destination MAC error {} != {}.".
- format(ether.dst, rx_dst_mac))
- print "Destination MAC: OK."
-
- if ether.src != rx_src_mac:
- raise RuntimeError("Source MAC error {} != {}.".
- format(ether.src, rx_src_mac))
- print "Source MAC: OK."
-
- ipv6 = ether.payload
-
- # check ipv6
- if ipv6.dst != rx_dst_ipv6:
- raise RuntimeError("Destination IP error {} != {}.".
- format(ipv6.dst, rx_dst_ipv6))
- print "Destination IPv6: OK."
-
- if ipv6.src != rx_src_ipv6:
- raise RuntimeError("Source IP error {} != {}.".
- format(ipv6.src, rx_src_ipv6))
- print "Source IPv6: OK."
-
- ipv4 = ipv6.payload
-
- # check ipv4
- if ipv4.dst != tx_dst_ipv4:
- raise RuntimeError("Destination IP error {} != {}.".
- format(ipv4.dst, tx_dst_ipv4))
- print "Destination IPv4: OK."
-
- if ipv4.src != tx_src_ipv4:
- raise RuntimeError("Source IP error {} != {}.".
- format(ipv4.src, tx_src_ipv4))
- print "Source IPv4: OK."
-
- # check icmp echo request
- if ipv4.proto != 1: # ICMP
- raise RuntimeError("IP protocol error {} != ICMP.".
- format(ipv4.proto))
- print "IPv4 protocol: OK."
-
- icmp = ipv4.payload
-
- # check icmp
- if icmptypes[icmp.type] != icmp_type:
- raise RuntimeError("ICMP type error {} != echo request.".
- format(icmp.type))
- print "ICMP type: OK."
-
- if icmp.id != tx_icmp_id:
- raise RuntimeError("ICMP ID error {} != {}.".
- format(icmp.id, tx_icmp_id))
- print "ICMP ID: OK."
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_ipv4_udp_check_lw_4o6.py b/resources/traffic_scripts/send_ipv4_udp_check_lw_4o6.py
deleted file mode 100755
index 116edb206b..0000000000
--- a/resources/traffic_scripts/send_ipv4_udp_check_lw_4o6.py
+++ /dev/null
@@ -1,136 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an empty UDP datagram and checks if IPv4 is
-correctly encapsulated into IPv6 packet."""
-
-import sys
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
-from ipaddress import ip_address
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def _is_ipv4_in_ipv6(pkt):
- """If IPv6 next header type in the given pkt is IPv4, return True,
- else return False. False is returned also if exception occurs."""
- ipv6_type = int('0x86dd', 16) # IPv6
- try:
- if pkt.type == ipv6_type:
- if pkt.payload.nh == 4:
- return True
- except: # pylint: disable=bare-except
- return False
- return False
-
-
-def main(): # pylint: disable=too-many-statements, too-many-locals
- """Main function of the script file."""
- args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4',
- 'tx_dst_udp_port', 'rx_dst_mac', 'rx_src_mac',
- 'src_ipv6', 'dst_ipv6'])
- rx_if = args.get_arg('rx_if')
- tx_if = args.get_arg('tx_if')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- tx_src_ipv4 = args.get_arg('tx_src_ipv4')
- tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
- tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
- tx_src_udp_port = 20000
- rx_dst_mac = args.get_arg('rx_dst_mac')
- rx_src_mac = args.get_arg('rx_src_mac')
- rx_src_ipv6 = args.get_arg('src_ipv6')
- rx_dst_ipv6 = args.get_arg('dst_ipv6')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
-
- # Create empty UDP datagram
- udp = (Ether(dst=tx_dst_mac) /
- IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
- UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port))
-
- txq.send(udp)
- sent_packets.append(udp)
-
- for _ in range(5):
- pkt = rxq.recv(2)
- if _is_ipv4_in_ipv6(pkt):
- ether = pkt
- break
- else:
- raise RuntimeError("IPv4 in IPv6 Rx error.")
-
- # check ethernet
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Destination MAC error {} != {}.".
- format(ether.dst, rx_dst_mac))
- print "Destination MAC: OK."
-
- if ether.src != rx_src_mac:
- raise RuntimeError("Source MAC error {} != {}.".
- format(ether.src, rx_src_mac))
- print "Source MAC: OK."
-
- ipv6 = ether.payload
-
- # check ipv6
- if ip_address(unicode(ipv6.dst)) != ip_address(unicode(rx_dst_ipv6)):
- raise RuntimeError("Destination IP error {} != {}.".
- format(ipv6.dst, rx_dst_ipv6))
- print "Destination IPv6: OK."
-
- if ip_address(unicode(ipv6.src)) != ip_address(unicode(rx_src_ipv6)):
- raise RuntimeError("Source IP error {} != {}.".
- format(ipv6.src, rx_src_ipv6))
- print "Source IPv6: OK."
-
- ipv4 = ipv6.payload
-
- # check ipv4
- if ipv4.dst != tx_dst_ipv4:
- raise RuntimeError("Destination IP error {} != {}.".
- format(ipv4.dst, tx_dst_ipv4))
- print "Destination IPv4: OK."
-
- if ipv4.src != tx_src_ipv4:
- raise RuntimeError("Source IP error {} != {}.".
- format(ipv4.src, tx_src_ipv4))
- print "Source IPv4: OK."
-
- if ipv4.proto != 17: # UDP
- raise RuntimeError("IP protocol error {} != UDP.".
- format(ipv4.proto))
- print "IPv4 protocol: OK."
-
- udp = ipv4.payload
-
- # check udp
- if udp.dport != tx_dst_udp_port:
- raise RuntimeError("UDP dport error {} != {}.".
- format(udp.dport, tx_dst_udp_port))
- print "UDP dport: OK."
-
- if udp.sport != tx_src_udp_port:
- raise RuntimeError("UDP sport error {} != {}.".
- format(udp.sport, tx_src_udp_port))
- print "UDP sport: OK."
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_ipv4_udp_check_map_t.py b/resources/traffic_scripts/send_ipv4_udp_check_map_t.py
deleted file mode 100755
index 835f667751..0000000000
--- a/resources/traffic_scripts/send_ipv4_udp_check_map_t.py
+++ /dev/null
@@ -1,132 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends a UDP datagram and checks if IPv4 addresses
-are correctly translate to IPv6 addresses."""
-
-import sys
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
-from ipaddress import ip_address
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def _check_udp_checksum(pkt):
- """Check UDP checksum in IP packet. Return True if checksum is correct
- else False."""
- new = pkt.__class__(str(pkt))
- del new['UDP'].chksum
- new = new.__class__(str(new))
- return new['UDP'].chksum == pkt['UDP'].chksum
-
-
-def _is_udp_in_ipv6(pkt):
- """If IPv6 next header type in the given pkt is UDP, return True,
- else return False. False is returned also if exception occurs."""
- ipv6_type = int('0x86dd', 16) # IPv6
- try:
- if pkt.type == ipv6_type:
- if pkt.payload.nh == 17: # UDP
- return True
- except AttributeError:
- return False
- return False
-
-
-def main(): # pylint: disable=too-many-statements, too-many-locals
- """Main function of the script file."""
- args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4',
- 'tx_dst_udp_port', 'rx_dst_mac', 'rx_src_mac',
- 'rx_src_ipv6', 'rx_dst_ipv6'])
- rx_if = args.get_arg('rx_if')
- tx_if = args.get_arg('tx_if')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- tx_src_ipv4 = args.get_arg('tx_src_ipv4')
- tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
- tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
- tx_src_udp_port = 20000
- rx_dst_mac = args.get_arg('rx_dst_mac')
- rx_src_mac = args.get_arg('rx_src_mac')
- rx_src_ipv6 = args.get_arg('rx_src_ipv6')
- rx_dst_ipv6 = args.get_arg('rx_dst_ipv6')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
-
- # Create empty UDP datagram
- udp = (Ether(dst=tx_dst_mac) /
- IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
- UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) /
- 'udp_payload')
-
- txq.send(udp)
- sent_packets.append(udp)
-
- for _ in range(5):
- pkt = rxq.recv(2)
- if _is_udp_in_ipv6(pkt):
- ether = pkt
- break
- else:
- raise RuntimeError("UDP in IPv6 Rx error.")
-
- # check ethernet
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Destination MAC error {} != {}.".
- format(ether.dst, rx_dst_mac))
- print "Destination MAC: OK."
-
- if ether.src != rx_src_mac:
- raise RuntimeError("Source MAC error {} != {}.".
- format(ether.src, rx_src_mac))
- print "Source MAC: OK."
-
- ipv6 = ether.payload
-
- # check ipv6
- if ip_address(unicode(ipv6.dst)) != ip_address(unicode(rx_dst_ipv6)):
- raise RuntimeError("Destination IP error {} != {}.".
- format(ipv6.dst, rx_dst_ipv6))
- print "Destination IPv6: OK."
-
- if ip_address(unicode(ipv6.src)) != ip_address(unicode(rx_src_ipv6)):
- raise RuntimeError("Source IP error {} != {}.".
- format(ipv6.src, rx_src_ipv6))
- print "Source IPv6: OK."
-
- udp = ipv6.payload
-
- # check udp
- if udp.dport != tx_dst_udp_port:
- raise RuntimeError("UDP dport error {} != {}.".
- format(udp.dport, tx_dst_udp_port))
- print "UDP dport: OK."
-
- if udp.sport != tx_src_udp_port:
- raise RuntimeError("UDP sport error {} != {}.".
- format(udp.sport, tx_src_udp_port))
- print "UDP sport: OK."
-
- if not _check_udp_checksum(ipv6):
- raise RuntimeError("UDP checksum error.")
- print "UDP checksum OK."
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_ipv6_udp_check_map_t.py b/resources/traffic_scripts/send_ipv6_udp_check_map_t.py
deleted file mode 100755
index 2d9c291686..0000000000
--- a/resources/traffic_scripts/send_ipv6_udp_check_map_t.py
+++ /dev/null
@@ -1,135 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an empty UDP datagram and checks if IPv6 addresses
-are correctly translate to IPv4 addresses."""
-
-import sys
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6, UDP
-from ipaddress import ip_address
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def _check_udp_checksum(pkt):
- """Check UDP checksum in IP packet. Return True if checksum is correct
- else False."""
- new = pkt.__class__(str(pkt))
- del new['UDP'].chksum
- new = new.__class__(str(new))
- return new['UDP'].chksum == pkt['UDP'].chksum
-
-
-def _is_udp_in_ipv4(pkt):
- """If IPv4 protocol type in the given pkt is UDP, return True,
- else return False. False is returned also if exception occurs."""
- ipv4_type = int('0x0800', 16) # IPv4
- try:
- if pkt.type == ipv4_type:
- if pkt.payload.proto == 17: # UDP
- return True
- except AttributeError:
- return False
- return False
-
-
-def main(): # pylint: disable=too-many-statements, too-many-locals
- """Main function of the script file."""
- args = TrafficScriptArg(['tx_dst_mac', 'tx_src_mac',
- 'tx_src_ipv6', 'tx_dst_ipv6',
- 'tx_src_udp_port', 'rx_dst_mac', 'rx_src_mac',
- 'rx_src_ipv4', 'rx_dst_ipv4'])
- rx_if = args.get_arg('rx_if')
- tx_if = args.get_arg('tx_if')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- tx_src_mac = args.get_arg('tx_src_mac')
- tx_src_ipv6 = args.get_arg('tx_src_ipv6')
- tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
- tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
- tx_dst_udp_port = 20000
- rx_dst_mac = args.get_arg('rx_dst_mac')
- rx_src_mac = args.get_arg('rx_src_mac')
- rx_src_ipv4 = args.get_arg('rx_src_ipv4')
- rx_dst_ipv4 = args.get_arg('rx_dst_ipv4')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
-
- # Create empty UDP datagram in IPv6
-
- udp = (Ether(dst=tx_dst_mac, src=tx_src_mac) /
- IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) /
- UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) /
- 'udp_payload')
-
- txq.send(udp)
- sent_packets.append(udp)
-
- for _ in range(5):
- pkt = rxq.recv(2)
- if _is_udp_in_ipv4(pkt):
- ether = pkt
- break
- else:
- raise RuntimeError("UDP in IPv4 Rx error.")
-
- # check ethernet
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Destination MAC error {} != {}.".
- format(ether.dst, rx_dst_mac))
- print "Destination MAC: OK."
-
- if ether.src != rx_src_mac:
- raise RuntimeError("Source MAC error {} != {}.".
- format(ether.src, rx_src_mac))
- print "Source MAC: OK."
-
- ipv4 = ether.payload
-
- # check ipv4
- if ip_address(unicode(ipv4.dst)) != ip_address(unicode(rx_dst_ipv4)):
- raise RuntimeError("Destination IPv4 error {} != {}.".
- format(ipv4.dst, rx_dst_ipv4))
- print "Destination IPv4: OK."
-
- if ip_address(unicode(ipv4.src)) != ip_address(unicode(rx_src_ipv4)):
- raise RuntimeError("Source IPv4 error {} != {}.".
- format(ipv4.src, rx_src_ipv4))
- print "Source IPv4: OK."
-
- udp = ipv4.payload
-
- # check udp
- if udp.dport != tx_dst_udp_port:
- raise RuntimeError("UDP dport error {} != {}.".
- format(udp.dport, tx_dst_udp_port))
- print "UDP dport: OK."
-
- if udp.sport != tx_src_udp_port:
- raise RuntimeError("UDP sport error {} != {}.".
- format(udp.sport, tx_src_udp_port))
- print "UDP sport: OK."
-
- if not _check_udp_checksum(ipv4):
- raise RuntimeError("UDP checksum error.")
- print "UDP checksum OK."
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_lw_4o6_check_hairpinning_udp.py b/resources/traffic_scripts/send_lw_4o6_check_hairpinning_udp.py
deleted file mode 100755
index eb5c26ea61..0000000000
--- a/resources/traffic_scripts/send_lw_4o6_check_hairpinning_udp.py
+++ /dev/null
@@ -1,145 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6
-and checks if is correctly re-encapsulated."""
-
-import sys
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6
-from scapy.layers.inet import IP, UDP
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def _is_ipv4_in_ipv6(pkt):
- """If IPv6 next header type in the given pkt is IPv4, return True,
- else return False. False is returned also if exception occurs."""
- ipv6_type = int('0x86dd', 16) # IPv6
- try:
- if pkt.type == ipv6_type:
- if pkt.payload.nh == 4:
- return True
- except: # pylint: disable=bare-except
- return False
- return False
-
-
-def main(): # pylint: disable=too-many-statements, too-many-locals
- """Main function of the script file."""
- args = TrafficScriptArg(['tx_dst_mac',
- 'tx_dst_ipv6', 'tx_src_ipv6',
- 'tx_dst_ipv4', 'tx_src_ipv4',
- 'tx_dst_udp_port', 'tx_src_udp_port',
- 'rx_dst_mac', 'rx_src_mac',
- 'rx_dst_ipv6', 'rx_src_ipv6'])
- rx_if = args.get_arg('rx_if')
- tx_if = args.get_arg('tx_if')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- tx_src_mac = '02:00:00:00:00:01'
-
- tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
- tx_src_ipv6 = args.get_arg('tx_src_ipv6')
- tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
- tx_src_ipv4 = args.get_arg('tx_src_ipv4')
- tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
- tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
- rx_dst_mac = args.get_arg('rx_dst_mac')
- rx_src_mac = args.get_arg('rx_src_mac')
- rx_dst_ipv6 = args.get_arg('rx_dst_ipv6')
- rx_src_ipv6 = args.get_arg('rx_src_ipv6')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
-
- # Create empty UDP datagram in IPv4 and IPv6
- tx_pkt = Ether(dst=tx_dst_mac, src=tx_src_mac)
- tx_pkt /= IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6)
- tx_pkt /= IP(src=tx_src_ipv4, dst=tx_dst_ipv4)
- tx_pkt /= UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port)
- tx_pkt /= 'udp_payload'
-
- txq.send(tx_pkt)
- sent_packets.append(tx_pkt)
-
- for _ in range(5):
- pkt = rxq.recv(2, ignore=sent_packets)
- if _is_ipv4_in_ipv6(pkt):
- ether = pkt
- break
- else:
- raise RuntimeError("IPv4 in IPv6 Rx error.")
-
- # check ethernet
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Destination MAC error {} != {}.".
- format(ether.dst, rx_dst_mac))
- print "Destination MAC: OK."
-
- if ether.src != rx_src_mac:
- raise RuntimeError("Source MAC error {} != {}.".
- format(ether.src, rx_src_mac))
- print "Source MAC: OK."
-
- ipv6 = ether.payload
-
- # check ipv6
- if ipv6.dst != rx_dst_ipv6:
- raise RuntimeError("Destination IPv6 error {} != {}.".
- format(ipv6.dst, rx_dst_ipv6))
- print "Destination IPv6: OK."
-
- if ipv6.src != rx_src_ipv6:
- raise RuntimeError("Source IPv6 error {} != {}.".
- format(ipv6.src, rx_src_ipv6))
- print "Source IPv6: OK."
-
- ipv4 = ipv6.payload
-
- # check ipv4
- if ipv4.dst != tx_dst_ipv4:
- raise RuntimeError("Destination IPv4 error {} != {}.".
- format(ipv4.dst, tx_dst_ipv4))
- print "Destination IPv4: OK."
-
- if ipv4.src != tx_src_ipv4:
- raise RuntimeError("Source IPv4 error {} != {}.".
- format(ipv4.src, tx_src_ipv4))
- print "Source IPv4: OK."
-
- if ipv4.proto != 17: # UDP
- raise RuntimeError("IPv4 protocol error {} != UDP.".
- format(ipv4.proto))
- print "IPv4 protocol: OK."
-
- udp = ipv4.payload
-
- # check udp
- if udp.dport != tx_dst_udp_port:
- raise RuntimeError("UDP dport error {} != {}.".
- format(udp.dport, tx_dst_udp_port))
- print "UDP dport: OK."
-
- if udp.sport != tx_src_udp_port:
- raise RuntimeError("UDP sport error {} != {}.".
- format(udp.sport, tx_src_udp_port))
- print "UDP sport: OK."
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_lw_4o6_check_ipv4_udp.py b/resources/traffic_scripts/send_lw_4o6_check_ipv4_udp.py
deleted file mode 100755
index fcb745a407..0000000000
--- a/resources/traffic_scripts/send_lw_4o6_check_ipv4_udp.py
+++ /dev/null
@@ -1,126 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6
-and checks if is correctly decapsulated."""
-
-import sys
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6
-from scapy.layers.inet import IP, UDP
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def _is_udp_in_ipv4(pkt):
- """If UDP is in IPv4 packet return True,
- else return False. False is returned also if exception occurs."""
- ipv4_type = int('0x0800', 16) # IPv4
- try:
- if pkt.type == ipv4_type:
- if pkt.payload.proto == 17: # UDP
- return True
- except: # pylint: disable=bare-except
- return False
- return False
-
-
-def main(): # pylint: disable=too-many-statements, too-many-locals
- """Main function of the script file."""
- args = TrafficScriptArg(['tx_dst_mac', 'tx_src_mac',
- 'tx_dst_ipv6', 'tx_src_ipv6',
- 'tx_dst_ipv4', 'tx_src_ipv4', 'tx_src_udp_port',
- 'rx_dst_mac', 'rx_src_mac'])
- rx_if = args.get_arg('rx_if')
- tx_if = args.get_arg('tx_if')
- tx_src_mac = args.get_arg('tx_src_mac')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
- tx_src_ipv6 = args.get_arg('tx_src_ipv6')
- tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
- tx_src_ipv4 = args.get_arg('tx_src_ipv4')
- tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
- tx_dst_udp_port = 20000
- rx_dst_mac = args.get_arg('rx_dst_mac')
- rx_src_mac = args.get_arg('rx_src_mac')
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
- sent_packets = []
-
- # Create empty UDP datagram in IPv4 and IPv6
- tx_pkt = (Ether(dst=tx_dst_mac, src=tx_src_mac) /
- IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) /
- IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
- UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port))
-
- txq.send(tx_pkt)
- sent_packets.append(tx_pkt)
-
- for _ in range(5):
- pkt = rxq.recv(2)
- if _is_udp_in_ipv4(pkt):
- ether = pkt
- break
- else:
- raise RuntimeError("UDP in IPv4 Rx error.")
-
- # check ethernet
- if ether.dst != rx_dst_mac:
- raise RuntimeError("Destination MAC error {} != {}.".
- format(ether.dst, rx_dst_mac))
- print "Destination MAC: OK."
-
- if ether.src != rx_src_mac:
- raise RuntimeError("Source MAC error {} != {}.".
- format(ether.src, rx_src_mac))
- print "Source MAC: OK."
-
- ipv4 = ether.payload
-
- # check ipv4
- if ipv4.dst != tx_dst_ipv4:
- raise RuntimeError("Destination IPv4 error {} != {}.".
- format(ipv4.dst, tx_dst_ipv4))
- print "Destination IPv4: OK."
-
- if ipv4.src != tx_src_ipv4:
- raise RuntimeError("Source IPv4 error {} != {}.".
- format(ipv4.src, tx_src_ipv4))
- print "Source IPv4: OK."
-
- if ipv4.proto != 17: # UDP
- raise RuntimeError("IPv4 protocol error {} != UDP.".
- format(ipv4.proto))
- print "IPv4 protocol: OK."
-
- udp = ipv4.payload
-
- # check udp
- if udp.dport != tx_dst_udp_port:
- raise RuntimeError("UDP dport error {} != {}.".
- format(udp.dport, tx_dst_udp_port))
- print "UDP dport: OK."
-
- if udp.sport != tx_src_udp_port:
- raise RuntimeError("UDP sport error {} != {}.".
- format(udp.sport, tx_src_udp_port))
- print "UDP sport: OK."
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_rs_check_ra.py b/resources/traffic_scripts/send_rs_check_ra.py
deleted file mode 100755
index c9ff46528a..0000000000
--- a/resources/traffic_scripts/send_rs_check_ra.py
+++ /dev/null
@@ -1,127 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Router solicitation check script."""
-
-import sys
-import ipaddress
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_RS, ICMPv6ND_NS
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def mac_to_ipv6_linklocal(mac):
- """Transfer MAC address into specific link-local IPv6 address.
-
- :param mac: MAC address to be transferred.
- :type mac: str
- :return: IPv6 link-local address.
- :rtype: str
- """
- # Remove the most common delimiters: dots, dashes, etc.
- mac_value = int(mac.translate(None, ' .:-'), 16)
-
- # Split out the bytes that slot into the IPv6 address
- # XOR the most significant byte with 0x02, inverting the
- # Universal / Local bit
- high2 = mac_value >> 32 & 0xffff ^ 0x0200
- high1 = mac_value >> 24 & 0xff
- low1 = mac_value >> 16 & 0xff
- low2 = mac_value & 0xffff
-
- return 'fe80::{:04x}:{:02x}ff:fe{:02x}:{:04x}'.format(
- high2, high1, low1, low2)
-
-
-def main():
- """Send Router Solicitation packet, check if the received response\
- is a Router Advertisement packet and verify."""
-
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip']
- )
-
- router_mac = args.get_arg('dst_mac')
- src_mac = args.get_arg('src_mac')
- src_ip = args.get_arg('src_ip')
- if not src_ip:
- src_ip = mac_to_ipv6_linklocal(src_mac)
- tx_if = args.get_arg('tx_if')
-
- txq = TxQueue(tx_if)
- rxq = RxQueue(tx_if)
-
- pkt_raw = (Ether(src=src_mac, dst='33:33:00:00:00:02') /
- IPv6(src=src_ip, dst='ff02::2') /
- ICMPv6ND_RS())
-
- sent_packets = [pkt_raw]
- txq.send(pkt_raw)
-
- while True:
- ether = rxq.recv(2, sent_packets)
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- # Check whether received packet contains layer RA and check other values
- if ether.src != router_mac:
- raise RuntimeError('Packet source MAC ({0}) does not match router MAC '
- '({1}).'.format(ether.src, router_mac))
- if ether.dst != src_mac:
- raise RuntimeError('Packet destination MAC ({0}) does not match RS '
- 'source MAC ({1}).'.format(ether.dst, src_mac))
-
- if not ether.haslayer(ICMPv6ND_RA):
- raise RuntimeError('Not an RA packet received {0}'.
- format(ether.__repr__()))
-
- src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
- dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
- router_link_local = ipaddress.IPv6Address(unicode(
- mac_to_ipv6_linklocal(router_mac)))
- rs_src_address = ipaddress.IPv6Address(unicode(src_ip))
-
- if src_address != router_link_local:
- raise RuntimeError('Packet source address ({0}) does not match link '
- 'local address({1})'.
- format(src_address, router_link_local))
-
- if dst_address != rs_src_address:
- raise RuntimeError('Packet destination address ({0}) does not match '
- 'RS source address ({1}).'.
- format(dst_address, rs_src_address))
-
- if ether['IPv6'].hlim != 255:
- raise RuntimeError('Hop limit not correct: {0}!=255'.
- format(ether['IPv6'].hlim))
-
- ra_code = ether[ICMPv6ND_RA].code
- if ra_code != 0:
- raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_tcp_udp.py b/resources/traffic_scripts/send_tcp_udp.py
deleted file mode 100755
index 4cba73286a..0000000000
--- a/resources/traffic_scripts/send_tcp_udp.py
+++ /dev/null
@@ -1,127 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Traffic script that sends an TCP or UDP packet
-from one interface to the other.
-"""
-
-import sys
-import ipaddress
-
-from scapy.all import Ether
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
-
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-
-
-def valid_ipv4(ip):
- """Check if IP address has the correct IPv4 address format.
-
- :param ip: IP address.
- :type ip: str
- :return: True in case of correct IPv4 address format,
- otherwise return False.
- :rtype: bool
- """
- try:
- ipaddress.IPv4Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def valid_ipv6(ip):
- """Check if IP address has the correct IPv6 address format.
-
- :param ip: IP address.
- :type ip: str
- :return: True in case of correct IPv6 address format,
- otherwise return False.
- :rtype: bool
- """
- try:
- ipaddress.IPv6Address(unicode(ip))
- return True
- except (AttributeError, ipaddress.AddressValueError):
- return False
-
-
-def main():
- """Send TCP or UDP packet from one traffic generator interface to the other.
- """
- args = TrafficScriptArg(['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
- 'source_port', 'destination_port'])
-
- src_mac = args.get_arg('tx_mac')
- dst_mac = args.get_arg('rx_mac')
- src_ip = args.get_arg('src_ip')
- dst_ip = args.get_arg('dst_ip')
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
-
- protocol = args.get_arg('protocol')
- source_port = args.get_arg('source_port')
- destination_port = args.get_arg('destination_port')
-
- if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
- ip_version = IP
- elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
- ip_version = IPv6
- else:
- ValueError("Invalid IP version!")
-
- if protocol.upper() == 'TCP':
- protocol = TCP
- elif protocol.upper() == 'UDP':
- protocol = UDP
- else:
- raise ValueError("Invalid protocol type!")
-
- rxq = RxQueue(rx_if)
- txq = TxQueue(tx_if)
-
- pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
-
- txq.send(pkt_raw)
-
- while True:
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError('TCP/UDP Rx timeout')
-
- if ether.haslayer(ICMPv6ND_NS):
- # read another packet in the queue if the current one is ICMPv6ND_NS
- continue
- else:
- # otherwise process the current packet
- break
-
- if TCP in ether:
- print ("TCP packet received.")
-
- elif UDP in ether:
- print ("UDP packet received.")
- else:
- raise RuntimeError("Not an TCP or UDP packet received {0}".
- format(ether.__repr__()))
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- main()
diff --git a/resources/traffic_scripts/send_vxlan_check_vxlan.py b/resources/traffic_scripts/send_vxlan_check_vxlan.py
index 3bf273d17a..162703d60c 100755
--- a/resources/traffic_scripts/send_vxlan_check_vxlan.py
+++ b/resources/traffic_scripts/send_vxlan_check_vxlan.py
@@ -1,5 +1,6 @@
-#!/usr/bin/env python
-# Copyright (c) 2016 Cisco and/or its affiliates.
+#!/usr/bin/env python3
+
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -18,42 +19,46 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
import sys
-import vxlan
-
-from scapy.layers.inet import IP, UDP, Raw
+from scapy.layers.inet import IP, UDP
from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
+from resources.traffic_scripts import vxlan
def main():
"""Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
"""
- args = TrafficScriptArg(['tx_src_mac', 'tx_dst_mac', 'tx_src_ip',
- 'tx_dst_ip', 'tx_vni', 'rx_src_ip', 'rx_dst_ip',
- 'rx_vni'])
-
- tx_if = args.get_arg('tx_if')
- rx_if = args.get_arg('rx_if')
- tx_src_mac = args.get_arg('tx_src_mac')
- tx_dst_mac = args.get_arg('tx_dst_mac')
- tx_src_ip = args.get_arg('tx_src_ip')
- tx_dst_ip = args.get_arg('tx_dst_ip')
- tx_vni = args.get_arg('tx_vni')
- rx_src_ip = args.get_arg('rx_src_ip')
- rx_dst_ip = args.get_arg('rx_dst_ip')
- rx_vni = args.get_arg('rx_vni')
+ args = TrafficScriptArg(
+ [
+ u"tx_src_mac", u"tx_dst_mac", u"tx_src_ip", u"tx_dst_ip", u"tx_vni",
+ u"rx_src_ip", u"rx_dst_ip", u"rx_vni"
+ ]
+ )
+
+ tx_if = args.get_arg(u"tx_if")
+ rx_if = args.get_arg(u"rx_if")
+ tx_src_mac = args.get_arg(u"tx_src_mac")
+ tx_dst_mac = args.get_arg(u"tx_dst_mac")
+ tx_src_ip = args.get_arg(u"tx_src_ip")
+ tx_dst_ip = args.get_arg(u"tx_dst_ip")
+ tx_vni = args.get_arg(u"tx_vni")
+ rx_src_ip = args.get_arg(u"rx_src_ip")
+ rx_dst_ip = args.get_arg(u"rx_dst_ip")
+ rx_vni = args.get_arg(u"rx_vni")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
sent_packets = []
- tx_pkt_p = (Ether(src='02:00:00:00:00:01', dst='02:00:00:00:00:02') /
- IP(src='192.168.1.1', dst='192.168.1.2') /
+ tx_pkt_p = (Ether(src=u"02:00:00:00:00:01", dst=u"02:00:00:00:00:02") /
+ IP(src=u"192.168.1.1", dst=u"192.168.1.2") /
UDP(sport=12345, dport=1234) /
- Raw('rew data'))
+ Raw(u"raw data"))
pkt_raw = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
IP(src=tx_src_ip, dst=tx_dst_ip) /
@@ -61,6 +66,7 @@ def main():
vxlan.VXLAN(vni=int(tx_vni)) /
tx_pkt_p)
+ pkt_raw /= Raw()
# Send created packet on one interface and receive on the other
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
@@ -69,38 +75,45 @@ def main():
# Check whether received packet contains layers Ether, IP and VXLAN
if ether is None:
- raise RuntimeError('Packet Rx timeout')
+ raise RuntimeError(u"Packet Rx timeout")
ip = ether.payload
if ip.src != rx_src_ip:
- raise RuntimeError('IP src mismatch {} != {}'.format(ip.src, rx_src_ip))
+ raise RuntimeError(f"IP src mismatch {ip.src} != {rx_src_ip}")
if ip.dst != rx_dst_ip:
- raise RuntimeError('IP dst mismatch {} != {}'.format(ip.dst, rx_dst_ip))
+ raise RuntimeError(f"IP dst mismatch {ip.dst} != {rx_dst_ip}")
if ip.payload.dport != 4789:
- raise RuntimeError('VXLAN UDP port mismatch {} != {}'.
- format(ip.payload.dport, 4789))
+ raise RuntimeError(
+ f"VXLAN UDP port mismatch {ip.payload.dport} != 4789"
+ )
vxlan_pkt = ip.payload.payload
if int(vxlan_pkt.vni) != int(rx_vni):
- raise RuntimeError('vxlan mismatch')
+ raise RuntimeError(u"vxlan mismatch")
rx_pkt_p = vxlan_pkt.payload
if rx_pkt_p.src != tx_pkt_p.src:
- raise RuntimeError('RX encapsulated MAC src mismatch {} != {}'.
- format(rx_pkt_p.src, tx_pkt_p.src))
+ raise RuntimeError(
+ f"RX encapsulated MAC src mismatch {rx_pkt_p.src} != {tx_pkt_p.src}"
+ )
if rx_pkt_p.dst != tx_pkt_p.dst:
- raise RuntimeError('RX encapsulated MAC dst mismatch {} != {}'.
- format(rx_pkt_p.dst, tx_pkt_p.dst))
- if rx_pkt_p['IP'].src != tx_pkt_p['IP'].src:
- raise RuntimeError('RX encapsulated IP src mismatch {} != {}'.
- format(rx_pkt_p['IP'].src, tx_pkt_p['IP'].src))
- if rx_pkt_p['IP'].dst != tx_pkt_p['IP'].dst:
- raise RuntimeError('RX encapsulated IP dst mismatch {} != {}'.
- format(rx_pkt_p['IP'].dst, tx_pkt_p['IP'].dst))
+ raise RuntimeError(
+ f"RX encapsulated MAC dst mismatch {rx_pkt_p.dst} != {tx_pkt_p.dst}"
+ )
+ if rx_pkt_p[IP].src != tx_pkt_p[IP].src:
+ raise RuntimeError(
+ f"RX encapsulated IP src mismatch {rx_pkt_p[IP].src} != "
+ f"{tx_pkt_p[IP].src}"
+ )
+ if rx_pkt_p[IP].dst != tx_pkt_p[IP].dst:
+ raise RuntimeError(
+ f"RX encapsulated IP dst mismatch {rx_pkt_p[IP].dst} != "
+ f"{tx_pkt_p[IP].dst}"
+ )
# TODO: verify inner Ether()
sys.exit(0)
-if __name__ == "__main__":
+if __name__ == u"__main__":
main()
diff --git a/resources/traffic_scripts/vxlan.py b/resources/traffic_scripts/vxlan.py
index bf86f179a8..eebfb9056e 100644
--- a/resources/traffic_scripts/vxlan.py
+++ b/resources/traffic_scripts/vxlan.py
@@ -1,17 +1,19 @@
from scapy.fields import BitField, XByteField, X3BytesField
-from scapy.packet import Packet, bind_layers
-from scapy.layers.l2 import Ether
from scapy.layers.inet import UDP
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet, bind_layers
class VXLAN(Packet):
- name = "VXLAN"
- fields_desc = [BitField("flags", 0x08000000, 32),
- X3BytesField("vni", 0),
- XByteField("reserved", 0x00)]
+ name = u"VXLAN"
+ fields_desc = [
+ BitField(u"flags", 0x08000000, 32),
+ X3BytesField(u"vni", 0),
+ XByteField(u"reserved", 0x00)
+ ]
def mysummary(self):
- return self.sprintf("VXLAN (vni=%VXLAN.vni%)")
+ return self.sprintf(f"VXLAN (vni={VXLAN.vni})")
bind_layers(UDP, VXLAN, dport=4789)
bind_layers(VXLAN, Ether)