diff options
author | Jan Gelety <jgelety@cisco.com> | 2019-11-12 05:27:43 +0100 |
---|---|---|
committer | Jan Gelety <jgelety@cisco.com> | 2019-11-28 18:26:21 +0100 |
commit | d68951ac245150eeefa6e0f4156e4c1b5c9e9325 (patch) | |
tree | 487554a7547218d27f0a61ec02b70502c32cdcb4 /resources/traffic_scripts | |
parent | ed0258a440cfad7023d643f717ab78ac568dc59b (diff) |
Python3: resources and libraries
Change-Id: I1392c06b1d64f62b141d24c0d42a8e36913b15e2
Signed-off-by: Jan Gelety <jgelety@cisco.com>
Diffstat (limited to 'resources/traffic_scripts')
39 files changed, 415 insertions, 4981 deletions
diff --git a/resources/traffic_scripts/arp_request.py b/resources/traffic_scripts/arp_request.py deleted file mode 100755 index 8c5b9c7c47..0000000000 --- a/resources/traffic_scripts/arp_request.py +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Send an ARP request and verify the reply""" - -import sys - -from scapy.all import Ether, ARP - -from resources.libraries.python.PacketVerifier import Interface -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def parse_arguments(): - """Parse arguments of the script passed through command line - - :return: tuple of parsed arguments - """ - args = TrafficScriptArg(['src_if', 'src_mac', 'dst_mac', - 'src_ip', 'dst_ip']) - - # check for mandatory parameters - params = (args.get_arg('tx_if'), - args.get_arg('src_mac'), - args.get_arg('dst_mac'), - args.get_arg('src_ip'), - args.get_arg('dst_ip')) - if None in params: - raise Exception('Missing mandatory parameter(s)!') - - return params - - -def arp_request_test(): - """Send ARP request, expect a reply and verify its fields. - - returns: test status - :raises RuntimeError: ARP reply timeout. - """ - test_passed = False - (src_if, src_mac, dst_mac, src_ip, dst_ip) = parse_arguments() - - interface = Interface(src_if) - - # build an ARP request - arp_request = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(psrc=src_ip, hwsrc=src_mac, pdst=dst_ip, - hwdst='ff:ff:ff:ff:ff:ff')) - - # send the request - interface.send_pkt(arp_request) - - try: - # wait for APR reply - ether = interface.recv_pkt() - - if not ether: - raise RuntimeError("ARP reply timeout") - - # verify received packet - - if not ether.haslayer(ARP): - raise RuntimeError('Unexpected packet: does not contain ARP ' + - 'header "{}"'.format(ether.__repr__())) - - arp = ether['ARP'] - arp_reply = 2 - - if arp.op != arp_reply: - raise RuntimeError('expected op={}, received {}'.format(arp_reply, - arp.op)) - if arp.ptype != 0x800: - raise RuntimeError('expected ptype=0x800, received {}'. - format(arp.ptype)) - if arp.hwlen != 6: - raise RuntimeError('expected hwlen=6, received {}'. - format(arp.hwlen)) - if arp.plen != 4: - raise RuntimeError('expected plen=4, received {}'.format(arp.plen)) - if arp.hwsrc != dst_mac: - raise RuntimeError('expected hwsrc={}, received {}'. - format(dst_mac, arp.hwsrc)) - if arp.psrc != dst_ip: - raise RuntimeError('expected psrc={}, received {}'. - format(dst_ip, arp.psrc)) - if arp.hwdst != src_mac: - raise RuntimeError('expected hwdst={}, received {}'. - format(src_mac, arp.hwdst)) - if arp.pdst != src_ip: - raise RuntimeError('expected pdst={}, received {}'. - format(src_ip, arp.pdst)) - test_passed = True - - except RuntimeError as ex: - print 'Error occurred: {}'.format(ex) - - return test_passed - - -def main(): - """Run the test and collect result""" - if arp_request_test(): - sys.exit(0) - else: - sys.exit(1) - -if __name__ == '__main__': - main() diff --git a/resources/traffic_scripts/check_ra_packet.py b/resources/traffic_scripts/check_ra_packet.py deleted file mode 100755 index 9717c7db95..0000000000 --- a/resources/traffic_scripts/check_ra_packet.py +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Router advertisement check script.""" - -import sys -import ipaddress - -from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_NS - -from resources.libraries.python.PacketVerifier import RxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def mac_to_ipv6_linklocal(mac): - """Transfer MAC address into specific link-local IPv6 address. - - :param mac: MAC address to be transferred. - :type mac: str - :return: IPv6 link-local address. - :rtype: str - """ - # Remove the most common delimiters: dots, dashes, etc. - mac_value = int(mac.translate(None, ' .:-'), 16) - - # Split out the bytes that slot into the IPv6 address - # XOR the most significant byte with 0x02, inverting the - # Universal / Local bit - high2 = mac_value >> 32 & 0xffff ^ 0x0200 - high1 = mac_value >> 24 & 0xff - low1 = mac_value >> 16 & 0xff - low2 = mac_value & 0xffff - - return 'fe80::{:04x}:{:02x}ff:fe{:02x}:{:04x}'.format( - high2, high1, low1, low2) - - -def main(): - """Check packets on specific port and look for the Router Advertisement - part. - """ - - args = TrafficScriptArg(['src_mac', 'interval']) - - rx_if = args.get_arg('rx_if') - src_mac = args.get_arg('src_mac') - interval = int(args.get_arg('interval')) - rxq = RxQueue(rx_if) - - # receive ICMPv6ND_RA packet - while True: - ether = rxq.recv(max(5, interval)) - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - # Check if received packet contains layer RA and check other values - if not ether.haslayer(ICMPv6ND_RA): - raise RuntimeError('Not an RA packet received {0}'. - format(ether.__repr__())) - - src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src)) - dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst)) - link_local = ipaddress.IPv6Address(unicode(mac_to_ipv6_linklocal(src_mac))) - all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1') - - if src_address != link_local: - raise RuntimeError('Source address ({0}) not matching link local ' - 'address ({1})'.format(src_address, link_local)) - if dst_address != all_nodes_multicast: - raise RuntimeError('Packet destination address ({0}) is not the all' - ' nodes multicast address ({1}).'. - format(dst_address, all_nodes_multicast)) - if ether[IPv6].hlim != 255: - raise RuntimeError('Hop limit not correct: {0}!=255'. - format(ether[IPv6].hlim)) - - ra_code = ether[ICMPv6ND_RA].code - if ra_code != 0: - raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code)) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/dhcp/check_dhcp_discover.py b/resources/traffic_scripts/dhcp/check_dhcp_discover.py deleted file mode 100755 index 2fdc5b7fbf..0000000000 --- a/resources/traffic_scripts/dhcp/check_dhcp_discover.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that receives an DHCP packet on given interface and check if -is correct DHCP DISCOVER message. -""" - -import sys - -from scapy.layers.inet import UDP_SERVICES - -from resources.libraries.python.PacketVerifier import RxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def main(): - args = TrafficScriptArg(['rx_src_mac'], ['hostname']) - - rx_if = args.get_arg('rx_if') - rx_src_mac = args.get_arg('rx_src_mac') - hostname = args.get_arg('hostname') - - rx_dst_mac = 'ff:ff:ff:ff:ff:ff' - rx_src_ip = '0.0.0.0' - rx_dst_ip = '255.255.255.255' - boot_request = 1 - dhcp_magic = 'c\x82Sc' - - rxq = RxQueue(rx_if) - - ether = rxq.recv(10) - - if ether is None: - raise RuntimeError("DHCP DISCOVER Rx timeout.") - - if ether.dst != rx_dst_mac: - raise RuntimeError("Destination MAC address error.") - print "Destination MAC address: OK." - - if ether.src != rx_src_mac: - raise RuntimeError("Source MAC address error.") - print "Source MAC address: OK." - - if ether['IP'].dst != rx_dst_ip: - raise RuntimeError("Destination IP address error.") - print "Destination IP address: OK." - - if ether['IP'].src != rx_src_ip: - raise RuntimeError("Source IP address error.") - print "Source IP address: OK." - - if ether['IP']['UDP'].dport != UDP_SERVICES.bootps: - raise RuntimeError("UDP destination port error.") - print "UDP destination port: OK." - - if ether['IP']['UDP'].sport != UDP_SERVICES.bootpc: - raise RuntimeError("UDP source port error.") - print "UDP source port: OK." - - bootp = ether['BOOTP'] - - if bootp.op != boot_request: - raise RuntimeError("BOOTP message type error.") - print "BOOTP message type: OK" - - if bootp.ciaddr != '0.0.0.0': - raise RuntimeError("BOOTP client IP address error.") - print "BOOTP client IP address: OK" - - if bootp.yiaddr != '0.0.0.0': - raise RuntimeError("BOOTP 'your' (client) IP address error.") - print "BOOTP 'your' (client) IP address: OK" - - if bootp.siaddr != '0.0.0.0': - raise RuntimeError("BOOTP next server IP address error.") - print "BOOTP next server IP address: OK" - - if bootp.giaddr != '0.0.0.0': - raise RuntimeError("BOOTP relay agent IP address error.") - print "BOOTP relay agent IP address: OK" - - chaddr = bootp.chaddr[:bootp.hlen].encode('hex') - if chaddr != rx_src_mac.replace(':', ''): - raise RuntimeError("BOOTP client hardware address error.") - print "BOOTP client hardware address: OK" - - # Check hostname - if bootp.sname != 64*'\x00': - raise RuntimeError("BOOTP server name error.") - print "BOOTP server name: OK" - - # Check boot file - if bootp.file != 128*'\x00': - raise RuntimeError("BOOTP boot file name error.") - print "BOOTP boot file name: OK" - - # Check bootp magic - if bootp.options != dhcp_magic: - raise RuntimeError("DHCP magic error.") - print "DHCP magic: OK" - - # Check options - dhcp_options = ether['DHCP options'].options - - # Option 12 - hn = filter(lambda x: x[0] == 'hostname', dhcp_options) - if hostname: - try: - if hn[0][1] != hostname: - raise RuntimeError("Client's hostname doesn't match.") - except IndexError: - raise RuntimeError("Option list doesn't contain hostname option.") - else: - if len(hn) != 0: - raise RuntimeError("Option list contains hostname option.") - print "Option 12 hostname: OK" - - # Option 53 - mt = filter(lambda x: x[0] == 'message-type', dhcp_options)[0][1] - if mt != 1: - raise RuntimeError("Option 53 message-type error.") - print "Option 53 message-type: OK" - - # Option 55 - prl = filter(lambda x: x[0] == 'param_req_list', dhcp_options)[0][1] - if prl != '\x01\x1c\x02\x03\x0f\x06w\x0c,/\x1ay*': - raise RuntimeError("Option 55 param_req_list error.") - print "Option 55 param_req_list: OK" - - # Option 255 - if 'end' not in dhcp_options: - raise RuntimeError("end option error.") - print "end option: OK" - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/dhcp/check_dhcp_request.py b/resources/traffic_scripts/dhcp/check_dhcp_request.py deleted file mode 100755 index 522f2f507c..0000000000 --- a/resources/traffic_scripts/dhcp/check_dhcp_request.py +++ /dev/null @@ -1,218 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an DHCP OFFER message and checks if the DHCP -REQUEST contains all required fields.""" - -import sys - -from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP, UDP_SERVICES -from scapy.layers.dhcp import BOOTP, DHCP - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def is_discover(pkt): - """If DHCP message type option is set to dhcp discover return True, - else return False. False is returned also if exception occurs.""" - dhcp_discover = 1 - try: - dhcp_options = pkt['BOOTP']['DHCP options'].options - message_type = filter(lambda x: x[0] == 'message-type', - dhcp_options) - message_type = message_type[0][1] - return message_type == dhcp_discover - except: - return False - - -def is_request(pkt): - """If DHCP message type option is DHCP REQUEST return True, - else return False. False is returned also if exception occurs.""" - dhcp_request = 3 - try: - dhcp_options = pkt['BOOTP']['DHCP options'].options - message_type = filter(lambda x: x[0] == 'message-type', - dhcp_options) - message_type = message_type[0][1] - return message_type == dhcp_request - except: - return False - - -def main(): - """Main function of the script file.""" - args = TrafficScriptArg(['client_mac', 'server_mac', 'server_ip', - 'client_ip', 'client_mask'], - ['hostname', 'offer_xid']) - - server_if = args.get_arg('rx_if') - server_mac = args.get_arg('server_mac') - server_ip = args.get_arg('server_ip') - - client_mac = args.get_arg('client_mac') - client_ip = args.get_arg('client_ip') - client_mask = args.get_arg('client_mask') - - hostname = args.get_arg('hostname') - offer_xid = args.get_arg('offer_xid') - - rx_src_ip = '0.0.0.0' - rx_dst_ip = '255.255.255.255' - - rxq = RxQueue(server_if) - txq = TxQueue(server_if) - sent_packets = [] - - for _ in range(10): - dhcp_discover = rxq.recv(10) - if is_discover(dhcp_discover): - break - else: - raise RuntimeError("DHCP DISCOVER Rx error.") - - dhcp_offer = Ether(src=server_mac, dst=dhcp_discover.src) - dhcp_offer /= IP(src=server_ip, dst="255.255.255.255") - dhcp_offer /= UDP(sport=67, dport=68) - dhcp_offer /= BOOTP(op=2, - # if offer_xid differs from xid value in DHCP DISCOVER - # the DHCP OFFER has to be discarded - xid=int(offer_xid) if offer_xid - else dhcp_discover['BOOTP'].xid, - yiaddr=client_ip, - siaddr=server_ip, - chaddr=dhcp_discover['BOOTP'].chaddr) - dhcp_offer_options = [("message-type", "offer"), # Option 53 - ("subnet_mask", client_mask), # Option 1 - ("server_id", server_ip), # Option 54, dhcp server - ("lease_time", 43200), # Option 51 - "end"] - dhcp_offer /= DHCP(options=dhcp_offer_options) - - txq.send(dhcp_offer) - sent_packets.append(dhcp_offer) - - max_other_pkts = 10 - for _ in range(0, max_other_pkts): - dhcp_request = rxq.recv(5, sent_packets) - if not dhcp_request: - raise RuntimeError("DHCP REQUEST Rx timeout.") - if is_request(dhcp_request): - break - else: - raise RuntimeError("Max RX packet limit reached.") - - if offer_xid: - # if offer_xid differs from xid value in DHCP DISCOVER the DHCP OFFER - # has to be discarded - raise RuntimeError("DHCP REQUEST received. DHCP OFFER with wrong XID " - "has not been discarded.") - - # CHECK ETHER, IP, UDP - if dhcp_request.dst != dhcp_discover.dst: - raise RuntimeError("Destination MAC error.") - print "Destination MAC: OK." - - if dhcp_request.src != dhcp_discover.src: - raise RuntimeError("Source MAC error.") - print "Source MAC: OK." - - if dhcp_request['IP'].dst != rx_dst_ip: - raise RuntimeError("Destination IP error.") - print "Destination IP: OK." - - if dhcp_request['IP'].src != rx_src_ip: - raise RuntimeError("Source IP error.") - print "Source IP: OK." - - if dhcp_request['IP']['UDP'].dport != UDP_SERVICES.bootps: - raise RuntimeError("BOOTPs error.") - print "BOOTPs: OK." - - if dhcp_request['IP']['UDP'].sport != UDP_SERVICES.bootpc: - raise RuntimeError("BOOTPc error.") - print "BOOTPc: OK." - - # CHECK BOOTP - if dhcp_request['BOOTP'].op != dhcp_discover['BOOTP'].op: - raise RuntimeError("BOOTP operation error.") - print "BOOTP operation: OK" - - if dhcp_request['BOOTP'].xid != dhcp_discover['BOOTP'].xid: - raise RuntimeError("BOOTP XID error.") - print "BOOTP XID: OK" - - if dhcp_request['BOOTP'].ciaddr != '0.0.0.0': - raise RuntimeError("BOOTP ciaddr error.") - print "BOOTP ciaddr: OK" - - ca = dhcp_request['BOOTP'].chaddr[:dhcp_request['BOOTP'].hlen].encode('hex') - if ca != client_mac.replace(':', ''): - raise RuntimeError("BOOTP client hardware address error.") - print "BOOTP client hardware address: OK" - - if dhcp_request['BOOTP'].options != dhcp_discover['BOOTP'].options: - raise RuntimeError("DHCP options error.") - print "DHCP options: OK" - - # CHECK DHCP OPTIONS - dhcp_options = dhcp_request['DHCP options'].options - - hn = filter(lambda x: x[0] == 'hostname', dhcp_options) - if hostname: - try: - if hn[0][1] != hostname: - raise RuntimeError("Client's hostname doesn't match.") - except IndexError: - raise RuntimeError("Option list doesn't contain hostname option.") - else: - if len(hn) != 0: - raise RuntimeError("Option list contains hostname option.") - print "Option 12 hostname: OK" - - # Option 50 - ra = filter(lambda x: x[0] == 'requested_addr', dhcp_options)[0][1] - if ra != client_ip: - raise RuntimeError("Option 50 requested_addr error.") - print "Option 50 requested_addr: OK" - - # Option 53 - mt = filter(lambda x: x[0] == 'message-type', dhcp_options)[0][1] - if mt != 3: # request - raise RuntimeError("Option 53 message-type error.") - print "Option 53 message-type: OK" - - # Option 54 - sid = filter(lambda x: x[0] == 'server_id', dhcp_options)[0][1] - if sid != server_ip: - raise RuntimeError("Option 54 server_id error.") - print "Option 54 server_id: OK" - - # Option 55 - prl = filter(lambda x: x[0] == 'param_req_list', dhcp_options)[0][1] - if prl != '\x01\x1c\x02\x03\x0f\x06w\x0c,/\x1ay*': - raise RuntimeError("Option 55 param_req_list error.") - print "Option 55 param_req_list: OK" - - # Option 255 - if 'end' not in dhcp_options: - raise RuntimeError("end option error.") - print "end option: OK" - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/dhcp/check_dhcp_request_ack.py b/resources/traffic_scripts/dhcp/check_dhcp_request_ack.py deleted file mode 100755 index 8a3839cda7..0000000000 --- a/resources/traffic_scripts/dhcp/check_dhcp_request_ack.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends a DHCP ACK message when DHCP REQUEST message is -received.""" - -import sys - -from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP -from scapy.layers.dhcp import BOOTP, DHCP - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def is_discover(pkt): - """If DHCP message type option is set to dhcp discover return True, - else return False. False is returned also if exception occurs.""" - dhcp_discover = 1 - try: - dhcp_options = pkt['BOOTP']['DHCP options'].options - message_type = filter(lambda x: x[0] == 'message-type', - dhcp_options) - message_type = message_type[0][1] - return message_type == dhcp_discover - except: - return False - - -def is_request(pkt): - """If DHCP message type option is DHCP REQUEST return True, - else return False. False is returned also if exception occurs.""" - dhcp_request = 3 - try: - dhcp_options = pkt['BOOTP']['DHCP options'].options - message_type = filter(lambda x: x[0] == 'message-type', - dhcp_options) - message_type = message_type[0][1] - return message_type == dhcp_request - except: - return False - - -def main(): - """Main function of the script file.""" - args = TrafficScriptArg(['server_mac', 'server_ip', 'client_ip', - 'client_mask', 'lease_time']) - - server_if = args.get_arg('rx_if') - server_mac = args.get_arg('server_mac') - server_ip = args.get_arg('server_ip') - - client_ip = args.get_arg('client_ip') - client_mask = args.get_arg('client_mask') - - lease_time = int(args.get_arg('lease_time')) - - rxq = RxQueue(server_if) - txq = TxQueue(server_if) - sent_packets = [] - - for _ in range(10): - dhcp_discover = rxq.recv(10) - if is_discover(dhcp_discover): - break - else: - raise RuntimeError("DHCP DISCOVER Rx error.") - - dhcp_offer = Ether(src=server_mac, dst=dhcp_discover.src) - dhcp_offer /= IP(src=server_ip, dst="255.255.255.255") - dhcp_offer /= UDP(sport=67, dport=68) - dhcp_offer /= BOOTP(op=2, - xid=dhcp_discover['BOOTP'].xid, - yiaddr=client_ip, - siaddr=server_ip, - chaddr=dhcp_discover['BOOTP'].chaddr) - dhcp_offer_options = [("message-type", "offer"), # Option 53 - ("subnet_mask", client_mask), # Option 1 - ("server_id", server_ip), # Option 54, dhcp server - ("lease_time", lease_time), # Option 51 - "end"] - dhcp_offer /= DHCP(options=dhcp_offer_options) - - txq.send(dhcp_offer) - sent_packets.append(dhcp_offer) - - max_other_pkts = 10 - for _ in range(0, max_other_pkts): - dhcp_request = rxq.recv(5, sent_packets) - if not dhcp_request: - raise RuntimeError("DHCP REQUEST Rx timeout.") - if is_request(dhcp_request): - break - else: - raise RuntimeError("Max RX packet limit reached.") - - # Send dhcp ack - dhcp_ack = Ether(src=server_mac, dst=dhcp_request.src) - dhcp_ack /= IP(src=server_ip, dst="255.255.255.255") - dhcp_ack /= UDP(sport=67, dport=68) - dhcp_ack /= BOOTP(op=2, - xid=dhcp_request['BOOTP'].xid, - yiaddr=client_ip, - siaddr=server_ip, - flags=dhcp_request['BOOTP'].flags, - chaddr=dhcp_request['BOOTP'].chaddr) - dhcp_ack_options = [("message-type", "ack"), # Option 53. 5: ACK, 6: NAK - ("subnet_mask", client_mask), # Option 1 - ("server_id", server_ip), # Option 54, dhcp server - ("lease_time", lease_time), # Option 51, - "end"] - dhcp_ack /= DHCP(options=dhcp_ack_options) - - txq.send(dhcp_ack) - sent_packets.append(dhcp_ack) - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/dhcp/send_and_check_proxy_discover.py b/resources/traffic_scripts/dhcp/send_and_check_proxy_discover.py deleted file mode 100755 index d8089713fd..0000000000 --- a/resources/traffic_scripts/dhcp/send_and_check_proxy_discover.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends DHCP DISCOVER packet - and check if is received on interface.""" - -import sys - -from scapy.all import Ether -from scapy.layers.inet import IP, UDP -from scapy.layers.inet import UDP_SERVICES -from scapy.layers.dhcp import DHCP, BOOTP - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def is_discover(pkt): - """If DHCP message type option is set to dhcp discover return True, - else return False. False is returned also if exception occurs.""" - dhcp_discover = 1 - try: - dhcp_options = pkt['BOOTP']['DHCP options'].options - message_type = filter(lambda x: x[0] == 'message-type', - dhcp_options) - message_type = message_type[0][1] - return message_type == dhcp_discover - except: - return False - - -def main(): - """Send DHCP DISCOVER packet.""" - - args = TrafficScriptArg(['tx_src_ip', 'tx_dst_ip']) - - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - tx_src_ip = args.get_arg('tx_src_ip') - tx_dst_ip = args.get_arg('tx_dst_ip') - - sent_packets = [] - - dhcp_discover = Ether(dst="ff:ff:ff:ff:ff:ff") / \ - IP(src=tx_src_ip, dst=tx_dst_ip) / \ - UDP(sport=UDP_SERVICES.bootpc, dport=UDP_SERVICES.bootps) / \ - BOOTP(op=1,) / \ - DHCP(options=[("message-type", "discover"), - "end"]) - - sent_packets.append(dhcp_discover) - txq.send(dhcp_discover) - - for _ in range(10): - dhcp_discover = rxq.recv(2) - if is_discover(dhcp_discover): - break - else: - raise RuntimeError("DHCP DISCOVER Rx timeout") - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/dhcp/send_and_check_proxy_messages.py b/resources/traffic_scripts/dhcp/send_and_check_proxy_messages.py deleted file mode 100755 index 27f148c900..0000000000 --- a/resources/traffic_scripts/dhcp/send_and_check_proxy_messages.py +++ /dev/null @@ -1,299 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends DHCP packets.""" - -import sys - -from scapy.all import Ether -from scapy.layers.inet import IP, UDP -from scapy.layers.inet import UDP_SERVICES -from scapy.layers.dhcp import DHCP, BOOTP - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def dhcp_discover(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, proxy_ip, - client_mac): - """Send and check DHCP DISCOVER proxy packet.""" - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - dhcp_discover = Ether(src=client_mac, dst="ff:ff:ff:ff:ff:ff") / \ - IP(src=tx_src_ip, dst=tx_dst_ip) / \ - UDP(sport=UDP_SERVICES.bootpc, dport=UDP_SERVICES.bootps) / \ - BOOTP(op=1,) / \ - DHCP(options=[("message-type", "discover"), - "end"]) - - sent_packets.append(dhcp_discover) - txq.send(dhcp_discover) - - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError('DHCP DISCOVER timeout') - - if ether[IP].src != proxy_ip: - raise RuntimeError("Source IP address error.") - print "Source IP address: OK." - - if ether[IP].dst != server_ip: - raise RuntimeError("Destination IP address error.") - print "Destination IP address: OK." - - if ether[UDP].dport != UDP_SERVICES.bootps: - raise RuntimeError("UDP destination port error.") - print "UDP destination port: OK." - - if ether[UDP].sport != UDP_SERVICES.bootpc: - raise RuntimeError("UDP source port error.") - print "UDP source port: OK." - - if ether[DHCP].options[1][0] != 'relay_agent_Information': # option 82 - raise RuntimeError("Relay agent information error.") - option_82 = ether[DHCP].options[1][1] - - if ether[DHCP].options[0][1] != 1: # 1 - DISCOVER message - raise RuntimeError("DHCP DISCOVER message error.") - print "DHCP DISCOVER message OK." - - return option_82 - - -def dhcp_offer(rx_if, tx_if, tx_dst_ip, server_ip, proxy_ip, client_ip, - server_mac, option_82): - """Send and check DHCP OFFER proxy packet.""" - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - dhcp_offer = Ether(src=server_mac, dst="ff:ff:ff:ff:ff:ff") / \ - IP(src=server_ip, dst=tx_dst_ip) / \ - UDP(sport=UDP_SERVICES.bootps, dport=UDP_SERVICES.bootpc) / \ - BOOTP(op=2, - yiaddr=client_ip, - siaddr=server_ip) / \ - DHCP(options= - [("message-type", "offer"), - ("server_id", server_ip), - ("relay_agent_Information", option_82), - "end"]) - - txq.send(dhcp_offer) - sent_packets.append(dhcp_offer) - - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError('DHCP OFFER timeout') - - if ether[IP].dst != tx_dst_ip: - raise RuntimeError("Destination IP address error.") - print "Destination IP address: OK." - - if ether[IP].src != proxy_ip: - raise RuntimeError("Source IP address error.") - print "Source IP address: OK." - - if ether[UDP].dport != UDP_SERVICES.bootpc: - raise RuntimeError("UDP destination port error.") - print "UDP destination port: OK." - - if ether[UDP].sport != UDP_SERVICES.bootps: - raise RuntimeError("UDP source port error.") - print "UDP source port: OK." - - if ether[BOOTP].yiaddr != client_ip: - raise RuntimeError("Client IP address error.") - print "Client IP address: OK." - - if ether[BOOTP].siaddr != server_ip: - raise RuntimeError("DHCP server IP address error.") - print "DHCP server IP address: OK." - - if ether[DHCP].options[0][1] != 2: # 2 - OFFER message - raise RuntimeError("DHCP OFFER message error.") - print "DHCP OFFER message OK." - - -def dhcp_request(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, proxy_ip, - client_ip, client_mac): - """Send and check DHCP REQUEST proxy packet.""" - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - dhcp_request = Ether(src=client_mac, dst="ff:ff:ff:ff:ff:ff") / \ - IP(src=tx_src_ip, dst=tx_dst_ip) / \ - UDP(sport=UDP_SERVICES.bootpc, dport=UDP_SERVICES.bootps) / \ - BOOTP(op=1, - giaddr=proxy_ip, - siaddr=server_ip) / \ - DHCP(options=[("message-type", "request"), - ("server_id", server_ip), - ("requested_addr", client_ip), - "end"]) - - sent_packets.append(dhcp_request) - txq.send(dhcp_request) - - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError('DHCP REQUEST timeout') - - if ether[IP].dst != server_ip: - raise RuntimeError("Destination IP address error.") - print "Destination IP address: OK." - - if ether[IP].src != proxy_ip: - raise RuntimeError("Source IP address error.") - print "Source IP address: OK." - - if ether[UDP].dport != UDP_SERVICES.bootps: - raise RuntimeError("UDP destination port error.") - print "UDP destination port: OK." - - if ether[UDP].sport != UDP_SERVICES.bootpc: - raise RuntimeError("UDP source port error.") - print "UDP source port: OK." - - if ether[BOOTP].siaddr != server_ip: - raise RuntimeError("DHCP server IP address error.") - print "DHCP server IP address: OK." - - if ether[DHCP].options[2][1] != client_ip: - raise RuntimeError("Requested IP address error.") - print "Requested IP address: OK." - - if ether[DHCP].options[3][0] != 'relay_agent_Information': # option 82 - raise RuntimeError("Relay agent information error.") - - if ether[DHCP].options[0][1] != 3: # 2 - REQUEST message - raise RuntimeError("DHCP REQUEST message error.") - print "DHCP REQUEST message: OK." - - -def dhcp_ack(rx_if, tx_if, tx_dst_ip, server_ip, proxy_ip, client_ip, - server_mac, option_82): - """Send and check DHCP ACK proxy packet.""" - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - lease_time = 43200 # 12 hours - - sent_packets = [] - - dhcp_ack = Ether(src=server_mac, dst="ff:ff:ff:ff:ff:ff") / \ - IP(src=server_ip, dst=tx_dst_ip) / \ - UDP(sport=UDP_SERVICES.bootps, dport=UDP_SERVICES.bootpc) / \ - BOOTP(op=2, - yiaddr=client_ip, - siaddr=server_ip) / \ - DHCP(options= - [("message-type", "ack"), - ("server_id", server_ip), - ("lease_time", lease_time), - ("relay_agent_Information", option_82), - "end"]) - - txq.send(dhcp_ack) - sent_packets.append(dhcp_ack) - - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError('DHCP ACK timeout') - - if ether[IP].dst != tx_dst_ip: - raise RuntimeError("Destination IP address error.") - print "Destination IP address: OK." - - if ether[IP].src != proxy_ip: - raise RuntimeError("Source IP address error.") - print "Source IP address: OK." - - if ether[UDP].dport != UDP_SERVICES.bootpc: - raise RuntimeError("UDP destination port error.") - print "UDP destination port: OK." - - if ether[UDP].sport != UDP_SERVICES.bootps: - raise RuntimeError("UDP source port error.") - print "UDP source port: OK." - - if ether[BOOTP].yiaddr != client_ip: - raise RuntimeError("Client IP address error.") - print "Client IP address: OK." - - if ether[BOOTP].siaddr != server_ip: - raise RuntimeError("DHCP server IP address error.") - print "DHCP server IP address: OK." - - if ether[DHCP].options[2][1] != lease_time: - raise RuntimeError("DHCP lease time error.") - print "DHCP lease time OK." - - if ether[DHCP].options[0][1] != 5: # 5 - ACK message - raise RuntimeError("DHCP ACK message error.") - print "DHCP ACK message OK." - - -def main(): - """Send DHCP proxy messages.""" - - args = TrafficScriptArg(['server_ip', 'server_mac', 'client_ip', - 'client_mac', 'proxy_ip']) - - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - - tx_src_ip = "0.0.0.0" - tx_dst_ip = "255.255.255.255" - - server_ip = args.get_arg('server_ip') - client_ip = args.get_arg('client_ip') - proxy_ip = args.get_arg('proxy_ip') - client_mac = args.get_arg('client_mac') - server_mac = args.get_arg('server_mac') - - # DHCP DISCOVER - option_82 = dhcp_discover(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, - proxy_ip, client_mac) - - # DHCP OFFER - dhcp_offer(tx_if, rx_if, tx_dst_ip, server_ip, proxy_ip, client_ip, - server_mac, option_82) - - # DHCP REQUEST - dhcp_request(tx_if, rx_if, tx_src_ip, tx_dst_ip, server_ip, proxy_ip, - client_ip, client_mac) - - # DHCP ACK - dhcp_ack(tx_if, rx_if, tx_dst_ip, server_ip, proxy_ip, client_ip, - server_mac, option_82) - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py b/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py deleted file mode 100755 index 09742565ee..0000000000 --- a/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py +++ /dev/null @@ -1,372 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends DHCPv6 proxy packets.""" - -from scapy.layers.dhcp6 import * -from scapy.layers.inet6 import IPv6, UDP, UDP_SERVICES - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def _check_udp_checksum(pkt): - """Check udp checksum in ip packet. - Return true if checksum is correct.""" - new = pkt.__class__(str(pkt)) - del new['UDP'].chksum - new = new.__class__(str(new)) - return new['UDP'].chksum == pkt['UDP'].chksum - - -def _get_dhcpv6_msgtype(msg_index): - """Return DHCPv6 message type string. - - :param msg_index: Index of message type. - :return: Message type. - :type msg_index: int - :rtype msg_str: str - """ - dhcp6_messages = { - 1: "SOLICIT", - 2: "ADVERTISE", - 3: "REQUEST", - 4: "CONFIRM", - 5: "RENEW", - 6: "REBIND", - 7: "REPLY", - 8: "RELEASE", - 9: "DECLINE", - 10: "RECONFIGURE", - 11: "INFORMATION-REQUEST", - 12: "RELAY-FORW", - 13: "RELAY-REPL" - } - return dhcp6_messages[msg_index] - - -def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip, - server_ip, server_mac, client_duid, client_mac): - """Send and check DHCPv6 SOLICIT proxy packet. - - :param tx_if: Client interface. - :param rx_if: DHCPv6 server interface. - :param dhcp_multicast_ip: Servers and relay agents multicast address. - :param link_local_ip: Client link-local address. - :param proxy_ip: IP address of DHCPv6 proxy server. - :param server_ip: IP address of DHCPv6 server. - :param server_mac: MAC address of DHCPv6 server. - :param client_duid: Client DHCP Unique Identifier. - :param client_mac: Client MAC address. - :type tx_if: str - :type rx_if: str - :type dhcp_multicast_ip: str - :type link_local_ip: str - :type proxy_ip: str - :type server_ip: str - :type server_mac: str - :type client_duid: str - :type client_mac: str - :return interface_id: ID of proxy interface. - :rtype interface_id: str - """ - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - dhcp6_solicit_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \ - IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \ - UDP(sport=UDP_SERVICES.dhcpv6_client, - dport=UDP_SERVICES.dhcpv6_server) / \ - DHCP6_Solicit() / \ - DHCP6OptClientId(duid=client_duid) - - sent_packets.append(dhcp6_solicit_pkt) - txq.send(dhcp6_solicit_pkt) - - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError('DHCPv6 SOLICIT timeout') - - if ether.dst != server_mac: - raise RuntimeError("Destination MAC address error: {} != {}".format( - ether.dst, server_mac)) - print "Destination MAC address: OK." - - if ether['IPv6'].src != proxy_ip: - raise RuntimeError("Source IP address error: {} != {}".format( - ether['IPv6'].src, proxy_ip)) - print "Source IP address: OK." - - if ether['IPv6'].dst != server_ip: - raise RuntimeError("Destination IP address error: {} != {}".format( - ether['IPv6'].dst, server_ip)) - print "Destination IP address: OK." - - msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] - ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype) - if msgtype != 'RELAY-FORW': - raise RuntimeError("Message type error: {} != RELAY-FORW".format( - msgtype)) - print "Message type: OK." - - linkaddr = ether['IPv6']['UDP']\ - ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr - if linkaddr != proxy_ip: - raise RuntimeError("Proxy IP address error: {} != {}".format( - linkaddr, proxy_ip)) - print "Proxy IP address: OK." - - try: - interface_id = ether['IPv6']['UDP']\ - ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\ - ['Unknown DHCPv6 OPtion']['DHCP6 Interface-Id Option'].ifaceid - except Exception: - raise RuntimeError("DHCP6 Interface-Id error!") - - return interface_id - - -def dhcpv6_advertise(rx_if, tx_if, link_local_ip, proxy_ip, - server_ip, server_mac, proxy_to_server_mac, interface_id): - """Send and check DHCPv6 ADVERTISE proxy packet. - - :param rx_if: DHCPv6 server interface. - :param tx_if: Client interface. - :param link_local_ip: Client link-local address. - :param proxy_ip: IP address of DHCPv6 proxy server. - :param server_ip: IP address of DHCPv6 server. - :param server_mac: MAC address of DHCPv6 server. - :param proxy_to_server_mac: MAC address of DHCPv6 proxy interface. - :param interface_id: ID of proxy interface. - :type rx_if: str - :type tx_if: str - :type link_local_ip: str - :type proxy_ip: str - :type server_ip: str - :type server_mac: str - :type proxy_to_server_mac: str - :type interface_id: str - """ - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - dhcp6_advertise_pkt = Ether(src=server_mac, dst=proxy_to_server_mac) / \ - IPv6(src=server_ip, dst=proxy_ip) / \ - UDP(sport=UDP_SERVICES.dhcpv6_server, - dport=UDP_SERVICES.dhcpv6_client) / \ - DHCP6_RelayReply(peeraddr=link_local_ip, - linkaddr=proxy_ip) / \ - DHCP6OptIfaceId(ifaceid=interface_id) / \ - DHCP6OptRelayMsg() / \ - DHCP6_Advertise() - - sent_packets.append(dhcp6_advertise_pkt) - txq.send(dhcp6_advertise_pkt) - - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError('DHCPv6 ADVERTISE timeout') - - if ether['IPv6'].src != proxy_ip: - raise RuntimeError("Source IP address error: {} != {}".format( - ether['IPv6'].src, proxy_ip)) - print "Source IP address: OK." - - if not _check_udp_checksum(ether['IPv6']): - raise RuntimeError("Checksum error!") - print "Checksum: OK." - - msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] - ['DHCPv6 Advertise Message'].msgtype) - if msgtype != 'ADVERTISE': - raise RuntimeError("Message type error: {} != ADVERTISE".format( - msgtype)) - print "Message type: OK." - - -def dhcpv6_request(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip, - server_ip, client_duid, client_mac): - """Send and check DHCPv6 REQUEST proxy packet. - - :param tx_if: Client interface. - :param rx_if: DHCPv6 server interface. - :param dhcp_multicast_ip: Servers and relay agents multicast address. - :param link_local_ip: Client link-local address. - :param proxy_ip: IP address of DHCPv6 proxy server. - :param server_ip: IP address of DHCPv6 server. - :param client_duid: Client DHCP Unique Identifier. - :param client_mac: Client MAC address. - :type tx_if: str - :type rx_if: str - :type dhcp_multicast_ip: str - :type link_local_ip: str - :type proxy_ip: str - :type server_ip: str - :type client_duid: str - :type client_mac: str - """ - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - dhcp6_request_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \ - IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \ - UDP(sport=UDP_SERVICES.dhcpv6_client, - dport=UDP_SERVICES.dhcpv6_server) / \ - DHCP6_Request() / \ - DHCP6OptClientId(duid=client_duid) - - sent_packets.append(dhcp6_request_pkt) - txq.send(dhcp6_request_pkt) - - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError('DHCPv6 REQUEST timeout') - - if ether['IPv6'].src != proxy_ip: - raise RuntimeError("Source IP address error: {} != {}".format( - ether['IPv6'].src, proxy_ip)) - print "Source IP address: OK." - - if ether['IPv6'].dst != server_ip: - raise RuntimeError("Destination IP address error: {} != {}".format( - ether['IPv6'].dst, server_ip)) - print "Destination IP address: OK." - - msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] - ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype) - if msgtype != 'RELAY-FORW': - raise RuntimeError("Message type error: {} != RELAY-FORW".format( - msgtype)) - print "Message type: OK." - - linkaddr = ether['IPv6']['UDP']\ - ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr - if linkaddr != proxy_ip: - raise RuntimeError("Proxy IP address error: {} != {}".format( - linkaddr, proxy_ip)) - print "Proxy IP address: OK." - - -def dhcpv6_reply(rx_if, tx_if, link_local_ip, proxy_ip, server_ip, server_mac, - interface_id): - """Send and check DHCPv6 REPLY proxy packet. - - :param rx_if: DHCPv6 server interface. - :param tx_if: Client interface. - :param link_local_ip: Client link-local address. - :param proxy_ip: IP address of DHCPv6 proxy server. - :param server_ip: IP address of DHCPv6 server. - :param server_mac: MAC address of DHCPv6 server. - :param interface_id: ID of proxy interface. - :type rx_if: str - :type tx_if: str - :type link_local_ip: str - :type proxy_ip: str - :type server_ip: str - :type server_mac: str - :type interface_id: str - """ - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - dhcp_reply_pkt = Ether(src=server_mac) / \ - IPv6(src=server_ip, dst=proxy_ip) / \ - UDP(sport=UDP_SERVICES.dhcpv6_server, - dport=UDP_SERVICES.dhcpv6_client) / \ - DHCP6_RelayReply(peeraddr=link_local_ip, - linkaddr=proxy_ip) / \ - DHCP6OptIfaceId(ifaceid=interface_id) / \ - DHCP6OptRelayMsg() / \ - DHCP6_Reply() - - sent_packets.append(dhcp_reply_pkt) - txq.send(dhcp_reply_pkt) - - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError('DHCPv6 REPLY timeout') - - if ether['IPv6'].src != proxy_ip: - raise RuntimeError("Source IP address error: {} != {}".format( - ether['IPv6'].src, proxy_ip)) - print "Source IP address: OK." - - if not _check_udp_checksum(ether['IPv6']): - raise RuntimeError("Checksum error!") - print "Checksum: OK." - - msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] - ['DHCPv6 Reply Message'].msgtype) - if msgtype != 'REPLY': - raise RuntimeError("Message type error: {} != REPLY".format(msgtype)) - print "Message type: OK." - - -def main(): - """Send DHCPv6 proxy messages.""" - - args = TrafficScriptArg(['tx_src_ip', 'tx_dst_ip', 'proxy_ip', 'proxy_mac', - 'server_ip', 'client_mac', 'server_mac', - 'proxy_to_server_mac']) - - client_if = args.get_arg('tx_if') - server_if = args.get_arg('rx_if') - proxy_ip = args.get_arg('proxy_ip') - proxy_mac = args.get_arg('proxy_mac') - proxy_to_server_mac = args.get_arg('proxy_to_server_mac') - server_ip = args.get_arg('server_ip') - client_mac = args.get_arg('client_mac') - server_mac = args.get_arg('server_mac') - - link_local_ip = "fe80::1" - dhcp_multicast_ip = "ff02::1:2" - client_duid = str(random.randint(0, 9999)) - - # SOLICIT - interface_id = dhcpv6_solicit(client_if, server_if, dhcp_multicast_ip, - link_local_ip, proxy_ip, server_ip, - server_mac, client_duid, client_mac) - - # ADVERTISE - dhcpv6_advertise(client_if, server_if, link_local_ip, proxy_ip, - server_ip, server_mac, proxy_to_server_mac, interface_id) - - # REQUEST - dhcpv6_request(client_if, server_if, dhcp_multicast_ip, link_local_ip, - proxy_ip, server_ip, client_duid, client_mac) - - # REPLY - dhcpv6_reply(client_if, server_if, link_local_ip, proxy_ip, server_ip, - server_mac, interface_id) - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/icmpv6_echo.py b/resources/traffic_scripts/icmpv6_echo.py deleted file mode 100755 index a18896b07f..0000000000 --- a/resources/traffic_scripts/icmpv6_echo.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script for ICMPv6 echo test.""" - -import sys -import logging - -# pylint: disable=no-name-in-module -# pylint: disable=import-error -logging.getLogger("scapy.runtime").setLevel(logging.ERROR) - -from scapy.all import Ether -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS -from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.PacketVerifier import checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def main(): - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip']) - - rxq = RxQueue(args.get_arg('rx_if')) - txq = TxQueue(args.get_arg('tx_if')) - - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - echo_id = 0xa - echo_seq = 0x1 - - sent_packets = [] - - # send ICMPv6 neighbor advertisement message - pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=src_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=src_mac)) - sent_packets.append(pkt_send) - txq.send(pkt_send) - - # send ICMPv6 echo request - pkt_send = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) - sent_packets.append(pkt_send) - txq.send(pkt_send) - - # receive ICMPv6 echo reply - while True: - ether = rxq.recv(2, sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'. - format(ether.__repr__())) - - ipv6 = ether[IPv6] - - if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError('Unexpected packet with no ICMPv6 echo reply ' - 'received {0}'.format(ipv6.__repr__())) - - icmpv6 = ipv6[ICMPv6EchoReply] - - # check identifier and sequence number - if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} seq {1} ' - 'should be ID {2} seq {3}'. - format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) - - # verify checksum - cksum = icmpv6.cksum - del icmpv6.cksum - tmp = ICMPv6EchoReply(str(icmpv6)) - if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError('Invalid checksum {0} should be {1}'. - format(cksum, tmp.cksum)) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/icmpv6_echo_req_resp.py b/resources/traffic_scripts/icmpv6_echo_req_resp.py deleted file mode 100755 index 195f666b38..0000000000 --- a/resources/traffic_scripts/icmpv6_echo_req_resp.py +++ /dev/null @@ -1,190 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2018 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Send ICMPv6 echo request from one TG port to DUT port or to another TG port - through DUT node(s) and send reply back. Also verify hop limit processing.""" - -import sys -import logging - -# pylint: disable=no-name-in-module -# pylint: disable=import-error -logging.getLogger("scapy.runtime").setLevel(logging.ERROR) - -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS -from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply -from scapy.all import Ether - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.PacketVerifier import checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def main(): - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac', - 'src_ip', 'dst_ip', 'h_num'], ['is_dst_tg']) - - src_rxq = RxQueue(args.get_arg('tx_if')) - src_txq = TxQueue(args.get_arg('tx_if')) - - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - src_nh_mac = args.get_arg('src_nh_mac') - dst_nh_mac = args.get_arg('dst_nh_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - hop_num = int(args.get_arg('h_num')) - - is_dst_tg = True if args.get_arg('is_dst_tg') in ['True', ''] else False - dst_rxq = RxQueue(args.get_arg('rx_if')) if is_dst_tg else None - dst_txq = TxQueue(args.get_arg('rx_if')) if is_dst_tg else None - - hop_limit = 64 - echo_id = 0xa - echo_seq = 0x1 - - src_sent_packets = [] - dst_sent_packets = [] - - # send ICMPv6 neighbor advertisement message - pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=src_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=src_mac)) - src_sent_packets.append(pkt_send) - src_txq.send(pkt_send) - - if is_dst_tg: - # send ICMPv6 neighbor advertisement message - pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=dst_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NA(tgt=dst_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=dst_mac)) - dst_sent_packets.append(pkt_send) - dst_txq.send(pkt_send) - - # send ICMPv6 echo request from first TG interface - pkt_send = (Ether(src=src_mac, dst=src_nh_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) / - ICMPv6EchoRequest(id=echo_id, seq=echo_seq)) - src_sent_packets.append(pkt_send) - src_txq.send(pkt_send) - - if is_dst_tg: - # receive ICMPv6 echo request on second TG interface - while True: - ether = dst_rxq.recv(2, dst_sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is - # ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received: {0}'. - format(ether.__repr__())) - - ipv6 = ether[IPv6] - - # verify hop limit processing - if ipv6.hlim != (hop_limit - hop_num): - raise RuntimeError('Invalid hop limit {0} should be {1}'. - format(ipv6.hlim, hop_limit - hop_num)) - - if not ipv6.haslayer(ICMPv6EchoRequest): - raise RuntimeError('Unexpected packet with no IPv6 ICMP received ' - '{0}'.format(ipv6.__repr__())) - - icmpv6 = ipv6[ICMPv6EchoRequest] - - # check identifier and sequence number - if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} ' - 'seq {1} should be ID {2} seq {3}'. - format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) - - # verify checksum - cksum = icmpv6.cksum - del icmpv6.cksum - tmp = ICMPv6EchoRequest(str(icmpv6)) - if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError('Invalid checksum {0} should be {1}'. - format(cksum, tmp.cksum)) - - # send ICMPv6 echo reply from second TG interface - pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) / - IPv6(src=dst_ip, dst=src_ip, hlim=(ipv6.hlim - 1)) / - ICMPv6EchoReply(id=echo_id, seq=echo_seq)) - dst_sent_packets.append(pkt_send) - dst_txq.send(pkt_send) - - # receive ICMPv6 echo reply on first TG interface - while True: - ether = src_rxq.recv(2, src_sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'. - format(ether.__repr__())) - - ipv6 = ether[IPv6] - - # verify hop limit processing; destination node decrements hlim by one in - # outgoing ICMPv6 Echo Reply - directions = 2 if is_dst_tg else 1 - hop_limit_reply = hop_limit - directions * hop_num - 1 - if ipv6.hlim != hop_limit_reply: - raise RuntimeError('Invalid hop limit {0} should be {1}'. - format(ipv6.hlim, hop_limit_reply)) - - if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'. - format(ipv6.__repr__())) - - icmpv6 = ipv6[ICMPv6EchoReply] - - # check identifier and sequence number - if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} ' - 'seq {1} should be ID {2} seq {3}'. - format(icmpv6.id, icmpv6.seq, echo_id, echo_seq)) - - # verify checksum - cksum = icmpv6.cksum - del icmpv6.cksum - tmp = ICMPv6EchoReply(str(icmpv6)) - if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError('Invalid checksum {0} should be {1}'. - format(cksum, tmp.cksum)) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/ipsec.py b/resources/traffic_scripts/ipsec.py index b853b6c09f..320303d1fb 100755 --- a/resources/traffic_scripts/ipsec.py +++ b/resources/traffic_scripts/ipsec.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,18 +18,18 @@ import sys import logging +from ipaddress import ip_address # pylint: disable=no-name-in-module # pylint: disable=import-error -logging.getLogger("scapy.runtime").setLevel(logging.ERROR) - -from scapy.all import Ether +logging.getLogger(u"scapy.runtime").setLevel(logging.ERROR) from scapy.layers.inet import IP from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from scapy.layers.ipsec import SecurityAssociation, ESP -from ipaddress import ip_address +from scapy.layers.l2 import Ether +from scapy.packet import Raw -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue +from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in): @@ -50,37 +50,39 @@ def check_ipsec(pkt_recv, ip_layer, dst_tun, src_ip, dst_ip, sa_in): :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(ip_layer): - raise RuntimeError('Not an {ip} packet received: {pkt}'.format( - ip=ip_layer.name, pkt=pkt_recv.__repr__())) + raise RuntimeError( + f"Not an {ip_layer.name} packet received: {pkt_recv!r}" + ) - if pkt_recv[ip_layer.name].dst != dst_tun: + if pkt_recv[ip_layer].dst != dst_tun: raise RuntimeError( - 'Received packet has invalid destination address: {rec_ip} ' - 'should be: {exp_ip}'.format( - rec_ip=pkt_recv[ip_layer.name].dst, exp_ip=dst_tun)) + f"Received packet has invalid destination address: " + f"{pkt_recv[ip_layer].dst} should be: {dst_tun}" + ) if not pkt_recv.haslayer(ESP): - raise RuntimeError( - 'Not an ESP packet received: {pkt}'.format(pkt=pkt_recv.__repr__())) + raise RuntimeError(f"Not an ESP packet received: {pkt_recv!r}") ip_pkt = pkt_recv[ip_layer] d_pkt = sa_in.decrypt(ip_pkt) if d_pkt[ip_layer].dst != dst_ip: raise RuntimeError( - 'Decrypted packet has invalid destination address: {rec_ip} ' - 'should be: {exp_ip}'.format( - rec_ip=d_pkt[ip_layer].dst, exp_ip=dst_ip)) + f"Decrypted packet has invalid destination address: " + f"{d_pkt[ip_layer].dst} should be: {dst_ip}" + ) if d_pkt[ip_layer].src != src_ip: raise RuntimeError( - 'Decrypted packet has invalid source address: {rec_ip} should be: ' - '{exp_ip}'.format(rec_ip=d_pkt[ip_layer].src, exp_ip=src_ip)) + f"Decrypted packet has invalid source address: " + f"{d_pkt[ip_layer].src} should be: {src_ip}" + ) - if ip_layer == IP and d_pkt[ip_layer.name].proto != 61: + if ip_layer == IP and d_pkt[ip_layer].proto != 61: raise RuntimeError( - 'Decrypted packet has invalid IP protocol: {rec_proto} ' - 'should be: 61'.format(rec_proto=d_pkt[ip_layer.name].proto)) + f"Decrypted packet has invalid IP protocol: " + f"{d_pkt[ip_layer].proto} should be: 61" + ) def check_ip(pkt_recv, ip_layer, src_ip, dst_ip): @@ -97,25 +99,27 @@ def check_ip(pkt_recv, ip_layer, src_ip, dst_ip): :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(ip_layer): - raise RuntimeError('Not an {ip} packet received: {pkt}'.format( - ip=ip_layer.name, pkt=pkt_recv.__repr__())) + raise RuntimeError( + f"Not an {ip_layer.name} packet received: {pkt_recv!r}" + ) - if pkt_recv[ip_layer.name].dst != dst_ip: + if pkt_recv[ip_layer].dst != dst_ip: raise RuntimeError( - 'Received packet has invalid destination address: {rec_ip} ' - 'should be: {exp_ip}'.format( - rec_ip=pkt_recv[ip_layer.name].dst, exp_ip=dst_ip)) + f"Received packet has invalid destination address: " + f"{pkt_recv[ip_layer.name].dst} should be: {dst_ip}" + ) - if pkt_recv[ip_layer.name].src != src_ip: + if pkt_recv[ip_layer].src != src_ip: raise RuntimeError( - 'Received packet has invalid destination address: {rec_ip} ' - 'should be: {exp_ip}'.format( - rec_ip=pkt_recv[ip_layer.name].dst, exp_ip=src_ip)) + f"Received packet has invalid destination address: " + f"{pkt_recv[ip_layer.name].dst} should be: {src_ip}" + ) - if ip_layer == IP and pkt_recv[ip_layer.name].proto != 61: + if ip_layer == IP and pkt_recv[ip_layer].proto != 61: raise RuntimeError( - 'Received packet has invalid IP protocol: {rec_proto} ' - 'should be: 61'.format(rec_proto=pkt_recv[ip_layer.name].proto)) + f"Received packet has invalid IP protocol: " + f"{pkt_recv[ip_layer].proto} should be: 61" + ) # pylint: disable=too-many-locals @@ -124,36 +128,35 @@ def main(): """Send and receive IPsec packet.""" args = TrafficScriptArg( - ['tx_src_mac', 'tx_dst_mac', 'rx_src_mac', 'rx_dst_mac', 'src_ip', - 'dst_ip','crypto_alg', 'crypto_key', 'integ_alg', 'integ_key', - 'l_spi', 'r_spi'], - ['src_tun', 'dst_tun'] + [ + u"tx_src_mac", u"tx_dst_mac", u"rx_src_mac", u"rx_dst_mac", + u"src_ip", u"dst_ip", u"crypto_alg", u"crypto_key", u"integ_alg", + u"integ_key", u"l_spi", u"r_spi" + ], + [u"src_tun", u"dst_tun"] ) - tx_txq = TxQueue(args.get_arg('tx_if')) - tx_rxq = RxQueue(args.get_arg('tx_if')) - rx_txq = TxQueue(args.get_arg('rx_if')) - rx_rxq = RxQueue(args.get_arg('rx_if')) - - tx_src_mac = args.get_arg('tx_src_mac') - tx_dst_mac = args.get_arg('tx_dst_mac') - rx_src_mac = args.get_arg('rx_src_mac') - rx_dst_mac = args.get_arg('rx_dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - crypto_alg = args.get_arg('crypto_alg') - crypto_key = args.get_arg('crypto_key') - integ_alg = args.get_arg('integ_alg') - integ_key = args.get_arg('integ_key') - l_spi = int(args.get_arg('l_spi')) - r_spi = int(args.get_arg('r_spi')) - src_tun = args.get_arg('src_tun') - dst_tun = args.get_arg('dst_tun') - - if ip_address(unicode(src_ip)).version == 6: - ip_layer = IPv6 - else: - ip_layer = IP + tx_txq = TxQueue(args.get_arg(u"tx_if")) + tx_rxq = RxQueue(args.get_arg(u"tx_if")) + rx_txq = TxQueue(args.get_arg(u"rx_if")) + rx_rxq = RxQueue(args.get_arg(u"rx_if")) + + tx_src_mac = args.get_arg(u"tx_src_mac") + tx_dst_mac = args.get_arg(u"tx_dst_mac") + rx_src_mac = args.get_arg(u"rx_src_mac") + rx_dst_mac = args.get_arg(u"rx_dst_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + crypto_alg = args.get_arg(u"crypto_alg") + crypto_key = args.get_arg(u"crypto_key") + integ_alg = args.get_arg(u"integ_alg") + integ_key = args.get_arg(u"integ_key") + l_spi = int(args.get_arg(u"l_spi")) + r_spi = int(args.get_arg(u"r_spi")) + src_tun = args.get_arg(u"src_tun") + dst_tun = args.get_arg(u"dst_tun") + + ip_layer = IP if ip_address(src_ip).version == 4 else IPv6 tunnel_out = ip_layer(src=src_tun, dst=dst_tun) if src_tun and dst_tun \ else None @@ -163,23 +166,28 @@ def main(): if not (src_tun and dst_tun): src_tun = src_ip - sa_in = SecurityAssociation(ESP, spi=r_spi, crypt_algo=crypto_alg, - crypt_key=crypto_key, auth_algo=integ_alg, - auth_key=integ_key, tunnel_header=tunnel_in) + sa_in = SecurityAssociation( + ESP, spi=r_spi, crypt_algo=crypto_alg, + crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg, + auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_in + ) - sa_out = SecurityAssociation(ESP, spi=l_spi, crypt_algo=crypto_alg, - crypt_key=crypto_key, auth_algo=integ_alg, - auth_key=integ_key, tunnel_header=tunnel_out) + sa_out = SecurityAssociation( + ESP, spi=l_spi, crypt_algo=crypto_alg, + crypt_key=crypto_key.encode(encoding=u"utf-8"), auth_algo=integ_alg, + auth_key=integ_key.encode(encoding=u"utf-8"), tunnel_header=tunnel_out + ) ip_pkt = ip_layer(src=src_ip, dst=dst_ip, proto=61) if ip_layer == IP \ else ip_layer(src=src_ip, dst=dst_ip) - ip_pkt = ip_layer(str(ip_pkt)) + ip_pkt = ip_layer(ip_pkt) e_pkt = sa_out.encrypt(ip_pkt) tx_pkt_send = (Ether(src=tx_src_mac, dst=tx_dst_mac) / e_pkt) sent_packets = list() + tx_pkt_send /= Raw() sent_packets.append(tx_pkt_send) tx_txq.send(tx_pkt_send) @@ -187,8 +195,7 @@ def main(): rx_pkt_recv = rx_rxq.recv(2) if rx_pkt_recv is None: - raise RuntimeError( - '{ip} packet Rx timeout'.format(ip=ip_layer.name)) + raise RuntimeError(f"{ip_layer.name} packet Rx timeout") if rx_pkt_recv.haslayer(ICMPv6ND_NS): # read another packet in the queue if the current one is ICMPv6ND_NS @@ -204,15 +211,16 @@ def main(): rx_pkt_send = (Ether(src=rx_dst_mac, dst=rx_src_mac) / rx_ip_pkt) + rx_pkt_send /= Raw() rx_txq.send(rx_pkt_send) while True: tx_pkt_recv = tx_rxq.recv(2, sent_packets) if tx_pkt_recv is None: - raise RuntimeError('ESP packet Rx timeout') + raise RuntimeError(u"ESP packet Rx timeout") - if rx_pkt_recv.haslayer(ICMPv6ND_NS): + if tx_pkt_recv.haslayer(ICMPv6ND_NS): # read another packet in the queue if the current one is ICMPv6ND_NS continue else: @@ -224,5 +232,5 @@ def main(): sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main() diff --git a/resources/traffic_scripts/ipv4_ping_ttl_check.py b/resources/traffic_scripts/ipv4_ping_ttl_check.py deleted file mode 100755 index 99a7b0a5f0..0000000000 --- a/resources/traffic_scripts/ipv4_ping_ttl_check.py +++ /dev/null @@ -1,145 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from scapy.all import Ether, IP, ICMP -from resources.libraries.python.PacketVerifier \ - import Interface, create_gratuitous_arp_request, auto_pad, checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def check_ttl(ttl_begin, ttl_end, ttl_diff): - if ttl_begin != ttl_end + ttl_diff: - raise RuntimeError( - "TTL changed from {} to {} but decrease by {} expected" - .format(ttl_begin, ttl_end, ttl_diff)) - - -def ckeck_packets_equal(pkt_send, pkt_recv): - pkt_send_raw = auto_pad(pkt_send) - pkt_recv_raw = auto_pad(pkt_recv) - if pkt_send_raw != pkt_recv_raw: - print "Sent: {}".format(pkt_send_raw.encode('hex')) - print "Received: {}".format(pkt_recv_raw.encode('hex')) - print "Sent:" - pkt_send.show2() - print "Received:" - pkt_recv.show2() - raise RuntimeError("Sent packet doesn't match received packet") - - -def main(): - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'hops', 'first_hop_mac', 'is_dst_tg']) - - src_if_name = args.get_arg('tx_if') - dst_if_name = args.get_arg('rx_if') - is_dst_tg = True if args.get_arg('is_dst_tg') == 'True' else False - - src_mac = args.get_arg('src_mac') - first_hop_mac = args.get_arg('first_hop_mac') - dst_mac = args.get_arg('dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - hops = int(args.get_arg('hops')) - - if is_dst_tg and (src_if_name == dst_if_name): - raise RuntimeError( - "Source interface name equals destination interface name") - - src_if = Interface(src_if_name) - src_if.send_pkt(str(create_gratuitous_arp_request(src_mac, src_ip))) - if is_dst_tg: - dst_if = Interface(dst_if_name) - dst_if.send_pkt(str(create_gratuitous_arp_request(dst_mac, dst_ip))) - - pkt_req_send = (Ether(src=src_mac, dst=first_hop_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP()) - src_if.send_pkt(pkt_req_send) - - if is_dst_tg: - pkt_req_recv = dst_if.recv_pkt() - if pkt_req_recv is None: - raise RuntimeError('Timeout waiting for packet') - - check_ttl(pkt_req_send[IP].ttl, pkt_req_recv[IP].ttl, hops) - pkt_req_send_mod = pkt_req_send.copy() - pkt_req_send_mod[IP].ttl = pkt_req_recv[IP].ttl - del pkt_req_send_mod[IP].chksum # update checksum - ckeck_packets_equal(pkt_req_send_mod[IP], pkt_req_recv[IP]) - - pkt_resp_send = (Ether(src=dst_mac, dst=pkt_req_recv.src) / - IP(src=dst_ip, dst=src_ip) / - ICMP(type=0)) # echo-reply - dst_if.send_pkt(pkt_resp_send) - - pkt_resp_recv = src_if.recv_pkt() - if pkt_resp_recv is None: - raise RuntimeError('Timeout waiting for packet') - - if is_dst_tg: - check_ttl(pkt_resp_send[IP].ttl, pkt_resp_recv[IP].ttl, hops) - pkt_resp_send_mod = pkt_resp_send.copy() - pkt_resp_send_mod[IP].ttl = pkt_resp_recv[IP].ttl - del pkt_resp_send_mod[IP].chksum # update checksum - ckeck_packets_equal(pkt_resp_send_mod[IP], pkt_resp_recv[IP]) - - if not pkt_resp_recv.haslayer(IP): - raise RuntimeError('Received packet does not contain IPv4 header: {}'. - format(pkt_resp_recv.__repr__())) - - if pkt_resp_recv[IP].src != pkt_req_send[IP].dst: - raise RuntimeError( - 'Received IPv4 packet contains wrong src IP address, ' - '{} instead of {}'.format(pkt_resp_recv[IP].src, - pkt_req_send[IP].dst)) - - if pkt_resp_recv[IP].dst != pkt_req_send[IP].src: - raise RuntimeError( - 'Received IPv4 packet contains wrong dst IP address, ' - '{} instead of {}'.format(pkt_resp_recv[IP].dst, - pkt_req_send[IP].src)) - - # verify IPv4 checksum - copy = pkt_resp_recv.copy() - chksum = copy[IP].chksum - del copy[IP].chksum - tmp = IP(str(copy[IP])) - if not checksum_equal(tmp.chksum, chksum): - raise RuntimeError('Received IPv4 packet contains invalid checksum, ' - '{} instead of {}'.format(chksum, tmp.chksum)) - - if not pkt_resp_recv[IP].haslayer(ICMP): - raise RuntimeError( - 'Received IPv4 packet does not contain ICMP header: {}'. - format(pkt_resp_recv[IP].__repr__())) - - # verify ICMP checksum - copy = pkt_resp_recv.copy() - chksum = copy[IP][ICMP].chksum - del copy[IP][ICMP].chksum - tmp = ICMP(str(copy[IP][ICMP])) - if not checksum_equal(tmp.chksum, chksum): - raise RuntimeError('Received ICMP packet contains invalid checksum, ' - '{} instead of {}'.format(chksum, tmp.chksum)) - - pkt_req_send_mod = pkt_req_send.copy() - pkt_req_send_mod[IP][ICMP].type = pkt_resp_recv[IP][ICMP].type - del pkt_req_send_mod[IP][ICMP].chksum # update checksum - ckeck_packets_equal(pkt_req_send_mod[IP][ICMP], pkt_resp_recv[IP][ICMP]) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/ipv4_sweep_ping.py b/resources/traffic_scripts/ipv4_sweep_ping.py deleted file mode 100755 index e258d45213..0000000000 --- a/resources/traffic_scripts/ipv4_sweep_ping.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script for IPv4 sweep ping.""" - -import logging -import os -import sys - -logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, \ - create_gratuitous_arp_request, checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -from scapy.layers.inet import IP, ICMP -from scapy.all import Ether, Raw - - -def main(): - # start_size - start size of the ICMPv4 echo data - # end_size - end size of the ICMPv4 echo data - # step - increment step - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'start_size', 'end_size', 'step']) - - rxq = RxQueue(args.get_arg('rx_if')) - txq = TxQueue(args.get_arg('tx_if')) - - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - start_size = int(args.get_arg('start_size')) - end_size = int(args.get_arg('end_size')) - step = int(args.get_arg('step')) - echo_id = 0xa - # generate some random data buffer - data = bytearray(os.urandom(end_size)) - - sent_packets = [] - pkt_send = create_gratuitous_arp_request(src_mac, src_ip) - sent_packets.append(pkt_send) - txq.send(pkt_send) - - # send ICMP echo request with incremented data length and receive ICMP - # echo reply - for echo_seq in range(start_size, end_size + 1, step): - pkt_send = (Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP(id=echo_id, seq=echo_seq) / - Raw(load=data[0:echo_seq])) - sent_packets.append(pkt_send) - txq.send(pkt_send) - - ether = rxq.recv(ignore=sent_packets) - if ether is None: - raise RuntimeError( - 'ICMP echo reply seq {0} Rx timeout'.format(echo_seq)) - - if not ether.haslayer(IP): - raise RuntimeError( - 'Unexpected packet with no IPv4 received {0}'.format( - ether.__repr__())) - - ipv4 = ether['IP'] - - if not ipv4.haslayer(ICMP): - raise RuntimeError( - 'Unexpected packet with no ICMP received {0}'.format( - ipv4.__repr__())) - - icmpv4 = ipv4['ICMP'] - - if icmpv4.id != echo_id or icmpv4.seq != echo_seq: - raise RuntimeError( - 'Invalid ICMP echo reply received ID {0} seq {1} should be ' - 'ID {2} seq {3}, {0}'.format(icmpv4.id, icmpv4.seq, echo_id, - echo_seq)) - - chksum = icmpv4.chksum - del icmpv4.chksum - tmp = ICMP(str(icmpv4)) - if not checksum_equal(tmp.chksum, chksum): - raise RuntimeError( - 'Invalid checksum {0} should be {1}'.format(chksum, tmp.chksum)) - - if 'Raw' in icmpv4: - load = icmpv4['Raw'].load - else: - load = "" - if load != data[0:echo_seq]: - raise RuntimeError( - 'Received ICMP payload does not match sent payload') - - sent_packets.remove(pkt_send) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py deleted file mode 100755 index 1d96050cf4..0000000000 --- a/resources/traffic_scripts/ipv6_nd_proxy_check.py +++ /dev/null @@ -1,207 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends DHCPv6 proxy packets.""" - -from scapy.layers.inet import Ether -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply - - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): - """Send ICMPv6 Neighbor Solicitation packet and expect a response - from the proxy. - - :param tx_if: Interface on TG. - :param src_mac: MAC address of TG interface. - :param dst_mac: MAC address of proxy interface. - :param src_ip: IP address of TG interface. - :param dst_ip: IP address of proxied interface. - :type tx_if: str - :type src_mac: str - :type dst_mac: str - :type src_ip: str - :type dst_ip: str - :raises RuntimeError: If the received packet is not correct. - """ - - rxq = RxQueue(tx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip) / - ICMPv6ND_NS(tgt=dst_ip)) - - sent_packets.append(icmpv6nd_solicit_pkt) - txq.send(icmpv6nd_solicit_pkt) - - ether = None - for _ in range(5): - ether = rxq.recv(3, ignore=sent_packets) - if not ether: - continue - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue in case of ICMPv6ND_NS packet - continue - else: - # otherwise process the current packet - break - - if ether is None: - raise RuntimeError('ICMPv6ND Proxy response timeout.') - - if ether.src != dst_mac: - raise RuntimeError("Source MAC address error: {} != {}". - format(ether.src, dst_mac)) - print "Source MAC address: OK." - - if ether.dst != src_mac: - raise RuntimeError("Destination MAC address error: {} != {}". - format(ether.dst, src_mac)) - print "Destination MAC address: OK." - - if ether[IPv6].src != dst_ip: - raise RuntimeError("Source IP address error: {} != {}". - format(ether[IPv6].src, dst_ip)) - print "Source IP address: OK." - - if ether[IPv6].dst != src_ip: - raise RuntimeError("Destination IP address error: {} != {}". - format(ether[IPv6].dst, src_ip)) - print "Destination IP address: OK." - - try: - target_addr = ether[IPv6][ICMPv6ND_NA].tgt - except (KeyError, AttributeError): - raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.") - - if target_addr != dst_ip: - raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}". - format(target_addr, dst_ip)) - print "Target address field: OK." - - -def ipv6_ping(src_if, dst_if, src_mac, dst_mac, - proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip): - """Sends ICMPv6 Echo Request, receive it and send a reply. - - :param src_if: First TG interface on link to DUT. - :param dst_if: Second TG interface on link to DUT. - :param src_mac: MAC address of first interface. - :param dst_mac: MAC address of second interface. - :param proxy_to_src_mac: MAC address of first proxy interface on DUT. - :param proxy_to_dst_mac: MAC address of second proxy interface on DUT. - :param src_ip: IP address of first interface. - :param dst_ip: IP address of second interface. - :type src_if: str - :type dst_if: str - :type src_mac: str - :type dst_mac: str - :type proxy_to_src_mac: str - :type proxy_to_dst_mac: str - :type src_ip: str - :type dst_ip: str - :raises RuntimeError: If a received packet is not correct. - """ - rxq = RxQueue(dst_if) - txq = TxQueue(src_if) - - icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest()) - - txq.send(icmpv6_ping_pkt) - - ether = None - while True: - ether = rxq.recv(3) - if not ether: - continue - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue in case of ICMPv6ND_NS packet - continue - else: - # otherwise process the current packet - break - - if ether is None: - raise RuntimeError('ICMPv6 Echo Request timeout.') - try: - ether[IPv6]["ICMPv6 Echo Request"] - except KeyError: - raise RuntimeError("Received packet is not an ICMPv6 Echo Request.") - print "ICMP Echo: OK." - - rxq = RxQueue(src_if) - txq = TxQueue(dst_if) - - icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) / - IPv6(src=dst_ip, dst=src_ip) / - ICMPv6EchoReply()) - - txq.send(icmpv6_ping_pkt) - - ether = None - while True: - ether = rxq.recv(3) - if not ether: - continue - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue in case of ICMPv6ND_NS packet - continue - else: - # otherwise process the current packet - break - - if ether is None: - raise RuntimeError('DHCPv6 SOLICIT timeout') - try: - ether[IPv6]["ICMPv6 Echo Reply"] - except KeyError: - raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.") - - print "ICMP Reply: OK." - - -def main(): - """Send DHCPv6 proxy messages.""" - - args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac', - 'proxy_to_src_mac', 'proxy_to_dst_mac']) - - src_if = args.get_arg('tx_if') - dst_if = args.get_arg('rx_if') - src_ip = args.get_arg("src_ip") - dst_ip = args.get_arg("dst_ip") - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - proxy_to_src_mac = args.get_arg('proxy_to_src_mac') - proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac') - - # Neighbor solicitation - imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip) - - # Verify route (ICMP echo/reply) - ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac, - proxy_to_dst_mac, src_ip, dst_ip) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/ipv6_ns.py b/resources/traffic_scripts/ipv6_ns.py deleted file mode 100755 index 5852e6317f..0000000000 --- a/resources/traffic_scripts/ipv6_ns.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script for IPv6 Neighbor Solicitation test.""" - -import sys -import logging - -# pylint: disable=no-name-in-module -# pylint: disable=import-error -logging.getLogger("scapy.runtime").setLevel(logging.ERROR) - -from scapy.all import Ether -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS -from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.PacketVerifier import checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def main(): - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip']) - - rxq = RxQueue(args.get_arg('rx_if')) - txq = TxQueue(args.get_arg('tx_if')) - - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - - sent_packets = [] - - # send ICMPv6 neighbor solicitation message - pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') / - IPv6(src=src_ip, dst='ff02::1:ff00:2') / - ICMPv6ND_NS(tgt=dst_ip) / - ICMPv6NDOptSrcLLAddr(lladdr=src_mac)) - sent_packets.append(pkt_send) - txq.send(pkt_send) - - # receive ICMPv6 neighbor advertisement message - while True: - ether = rxq.recv(2, sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') - - if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 received {0}'. - format(ether.__repr__())) - - ipv6 = ether[IPv6] - - if not ipv6.haslayer(ICMPv6ND_NA): - raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received ' - '{0}'.format(ipv6.__repr__())) - - icmpv6_na = ipv6[ICMPv6ND_NA] - - # verify target address - if icmpv6_na.tgt != dst_ip: - raise RuntimeError('Invalid target address {0} should be {1}'. - format(icmpv6_na.tgt, dst_ip)) - - if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr): - raise RuntimeError('Missing Destination Link-Layer Address option in ' - 'ICMPv6 Neighbor Advertisement {0}'. - format(icmpv6_na.__repr__())) - - dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr] - - # verify destination link-layer address field - if dst_ll_addr.lladdr != dst_mac: - raise RuntimeError('Invalid lladdr {0} should be {1}'. - format(dst_ll_addr.lladdr, dst_mac)) - - # verify checksum - cksum = icmpv6_na.cksum - del icmpv6_na.cksum - tmp = ICMPv6ND_NA(str(icmpv6_na)) - if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError('Invalid checksum {0} should be {1}'. - format(cksum, tmp.cksum)) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/ipv6_sweep_ping.py b/resources/traffic_scripts/ipv6_sweep_ping.py deleted file mode 100755 index 28d23a16ef..0000000000 --- a/resources/traffic_scripts/ipv6_sweep_ping.py +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script for IPv6 sweep ping.""" - -import logging -import os -import sys - -# pylint: disable=no-name-in-module -# pylint: disable=import-error -logging.getLogger("scapy.runtime").setLevel(logging.ERROR) - -from scapy.all import Ether -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS -from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.PacketVerifier import checksum_equal -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def main(): - # start_size - start size of the ICMPv6 echo data - # end_size - end size of the ICMPv6 echo data - # step - increment step - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'start_size', 'end_size', 'step']) - - rxq = RxQueue(args.get_arg('rx_if')) - txq = TxQueue(args.get_arg('tx_if')) - - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - start_size = int(args.get_arg('start_size')) - end_size = int(args.get_arg('end_size')) - step = int(args.get_arg('step')) - echo_id = 0xa - # generate some random data buffer - data = bytearray(os.urandom(end_size)) - - # send ICMPv6 neighbor advertisement message - sent_packets = [] - pkt_send = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6ND_NA(tgt=src_ip, R=0) / - ICMPv6NDOptDstLLAddr(lladdr=src_mac)) - sent_packets.append(pkt_send) - txq.send(pkt_send) - - # send ICMPv6 echo request with incremented data length and receive ICMPv6 - # echo reply - for echo_seq in range(start_size, end_size + 1, step): - pkt_send = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest(id=echo_id, seq=echo_seq, - data=data[0:echo_seq])) - sent_packets.append(pkt_send) - txq.send(pkt_send) - - while True: - ether = rxq.recv(ignore=sent_packets) - if ether is None: - raise RuntimeError('ICMPv6 echo reply seq {0} ' - 'Rx timeout'.format(echo_seq)) - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue in case of ICMPv6ND_NS packet - continue - else: - # otherwise process the current packet - break - - if not ether.haslayer(IPv6): - raise RuntimeError('Unexpected packet with no IPv6 layer ' - 'received: {0}'.format(ether.__repr__())) - - ipv6 = ether[IPv6] - - if not ipv6.haslayer(ICMPv6EchoReply): - raise RuntimeError('Unexpected packet with no ICMPv6 echo reply ' - 'layer received: {0}'.format(ipv6.__repr__())) - - icmpv6 = ipv6[ICMPv6EchoReply] - - if icmpv6.id != echo_id or icmpv6.seq != echo_seq: - raise RuntimeError('ICMPv6 echo reply with invalid data received: ' - 'ID {0} seq {1} should be ID {2} seq {3}, {0}'. - format(icmpv6.id, icmpv6.seq, echo_id, - echo_seq)) - - cksum = icmpv6.cksum - del icmpv6.cksum - tmp = ICMPv6EchoReply(str(icmpv6)) - if not checksum_equal(tmp.cksum, cksum): - raise RuntimeError('Invalid checksum: {0} should be {1}'. - format(cksum, tmp.cksum)) - - sent_packets.remove(pkt_send) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/lisp/lisp_check.py b/resources/traffic_scripts/lisp/lisp_check.py index 9937de8077..ebd769fa9d 100755 --- a/resources/traffic_scripts/lisp/lisp_check.py +++ b/resources/traffic_scripts/lisp/lisp_check.py @@ -1,5 +1,6 @@ -#!/usr/bin/env python -# Copyright (c) 2017 Cisco and/or its affiliates. +#!/usr/bin/env python3 + +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -19,12 +20,13 @@ a LISP-encapsulated packet on the other interface and verifies received packet. import sys import ipaddress +from scapy.all import bind_layers, Packet +from scapy.fields import FlagsField, BitField, IntField from scapy.layers.inet import ICMP, IP, UDP from scapy.layers.inet6 import ICMPv6EchoRequest from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether -from scapy.all import bind_layers, Packet -from scapy.fields import FlagsField, BitField, IntField +from scapy.packet import Raw from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -32,26 +34,31 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg class LispHeader(Packet): """Scapy header for the LISP Layer.""" - name = "Lisp Header" + + name = u"Lisp Header" fields_desc = [ - FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "", "", ""]), - BitField("nonce/map_version", 0, size=24), - IntField("instance_id/locator_status_bits", 0)] + FlagsField( + u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""] + ), + BitField(u"nonce/map_version", 0, size=24), + IntField(u"instance_id/locator_status_bits", 0)] class LispInnerIP(IP): """Scapy inner LISP layer for IPv4-in-IPv4.""" - name = "Lisp Inner Layer - IPv4" + + name = u"Lisp Inner Layer - IPv4" class LispInnerIPv6(IPv6): """Scapy inner LISP layer for IPv6-in-IPv6.""" - name = "Lisp Inner Layer - IPv6" + + name = u"Lisp Inner Layer - IPv6" def valid_ipv4(ip): try: - ipaddress.IPv4Address(unicode(ip)) + ipaddress.IPv4Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -59,7 +66,7 @@ def valid_ipv4(ip): def valid_ipv6(ip): try: - ipaddress.IPv6Address(unicode(ip)) + ipaddress.IPv6Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -71,25 +78,28 @@ def main(): :raises RuntimeError: If the received packet is not correct.""" args = TrafficScriptArg( - ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', - 'dut_if2_mac', 'src_rloc', 'dst_rloc'], - ['ot_mode']) - - tx_src_mac = args.get_arg('tg_src_mac') - tx_dst_mac = args.get_arg('dut_if1_mac') - rx_dst_mac = args.get_arg('tg_dst_mac') - rx_src_mac = args.get_arg('dut_if2_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - src_rloc = args.get_arg("src_rloc") - dst_rloc = args.get_arg("dst_rloc") - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - ot_mode = args.get_arg('ot_mode') + [ + u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac", + u"dut_if2_mac", u"src_rloc", u"dst_rloc" + ], + [u"ot_mode"] + ) + + tx_src_mac = args.get_arg(u"tg_src_mac") + tx_dst_mac = args.get_arg(u"dut_if1_mac") + rx_dst_mac = args.get_arg(u"tg_dst_mac") + rx_src_mac = args.get_arg(u"dut_if2_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + src_rloc = args.get_arg(u"src_rloc") + dst_rloc = args.get_arg(u"dst_rloc") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + ot_mode = args.get_arg(u"ot_mode") rxq = RxQueue(rx_if) txq = TxQueue(tx_if) - sent_packets = [] + pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) if valid_ipv4(src_ip) and valid_ipv4(dst_ip): @@ -103,10 +113,13 @@ def main(): ip_format = IPv6 lisp_layer = LispInnerIPv6 else: - raise ValueError("IP not in correct format") + raise ValueError(u"IP not in correct format") bind_layers(UDP, LispHeader, dport=4341) bind_layers(LispHeader, lisp_layer) + + pkt_raw /= Raw() + sent_packets = list() sent_packets.append(pkt_raw) txq.send(pkt_raw) @@ -116,27 +129,23 @@ def main(): ether = rxq.recv(2) if ether is None: - raise RuntimeError("ICMP echo Rx timeout") + raise RuntimeError(u"ICMP echo Rx timeout") if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: - print("MAC addresses match.") + print(u"MAC addresses match.") else: - raise RuntimeError( - "Matching packet unsuccessful: {0}".format(ether.__repr__())) + raise RuntimeError(f"Matching packet unsuccessful: {ether!r}") ip = ether.payload - if ot_mode == '6to4': + if ot_mode == u"6to4": if not isinstance(ip, IP): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) - elif ot_mode == '4to6': - if not isinstance(ip, IP6): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") + elif ot_mode == u"4to6": + if not isinstance(ip, IPv6): + raise RuntimeError(f"Not an IP packet received {ip!r}") elif not isinstance(ip, ip_format): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") lisp = ether.getlayer(lisp_layer) if not lisp: @@ -144,31 +153,35 @@ def main(): # Compare data from packets if src_ip == lisp.src: - print("Source IP matches source EID.") + print(u"Source IP matches source EID.") else: - raise RuntimeError("Matching Src IP unsuccessful: {} != {}" - .format(src_ip, lisp.src)) + raise RuntimeError( + f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}" + ) if dst_ip == lisp.dst: - print("Destination IP matches destination EID.") + print(u"Destination IP matches destination EID.") else: - raise RuntimeError("Matching Dst IP unsuccessful: {} != {}" - .format(dst_ip, lisp.dst)) + raise RuntimeError( + f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}" + ) if src_rloc == ip.src: - print("Source RLOC matches configuration.") + print(u"Source RLOC matches configuration.") else: - raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}" - .format(src_rloc, ip.src)) + raise RuntimeError( + f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}" + ) if dst_rloc == ip.dst: - print("Destination RLOC matches configuration.") + print(u"Destination RLOC matches configuration.") else: - raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}" - .format(dst_rloc, ip.dst)) + raise RuntimeError( + f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}" + ) sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main() diff --git a/resources/traffic_scripts/lisp/lispgpe_check.py b/resources/traffic_scripts/lisp/lispgpe_check.py index d4de8635d7..50857c4fe8 100755 --- a/resources/traffic_scripts/lisp/lispgpe_check.py +++ b/resources/traffic_scripts/lisp/lispgpe_check.py @@ -1,5 +1,6 @@ -#!/usr/bin/env python -# Copyright (c) 2017 Cisco and/or its affiliates. +#!/usr/bin/env python3 + +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -20,12 +21,13 @@ packet. import sys import ipaddress +from scapy.all import bind_layers, Packet +from scapy.fields import FlagsField, BitField, XBitField, IntField from scapy.layers.inet import ICMP, IP, UDP from scapy.layers.inet6 import ICMPv6EchoRequest from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether -from scapy.all import bind_layers, Packet -from scapy.fields import FlagsField, BitField, XBitField, IntField +from scapy.packet import Raw from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -33,13 +35,17 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg class LispGPEHeader(Packet): """Scapy header for the Lisp GPE Layer.""" + name = "Lisp GPE Header" fields_desc = [ - FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]), - BitField("version", 0, size=2), - BitField("reserved", 0, size=14), - XBitField("next_protocol", 0, size=8), - IntField("instance_id/locator_status_bits", 0)] + FlagsField( + u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"] + ), + BitField(u"version", 0, size=2), + BitField(u"reserved", 0, size=14), + XBitField(u"next_protocol", 0, size=8), + IntField(u"instance_id/locator_status_bits", 0) + ] def guess_payload_class(self, payload): protocol = { @@ -53,17 +59,20 @@ class LispGPEHeader(Packet): class LispGPEInnerIP(IP): """Scapy inner LISP GPE layer for IPv4-in-IPv4.""" - name = "Lisp GPE Inner Layer - IPv4" + + name = u"Lisp GPE Inner Layer - IPv4" class LispGPEInnerIPv6(IPv6): """Scapy inner LISP GPE layer for IPv6-in-IPv6.""" - name = "Lisp GPE Inner Layer - IPv6" + + name = u"Lisp GPE Inner Layer - IPv6" class LispGPEInnerEther(Ether): """Scapy inner LISP GPE layer for Lisp-L2.""" - name = "Lisp GPE Inner Layer - Ethernet" + + name = u"Lisp GPE Inner Layer - Ethernet" class LispGPEInnerNSH(Packet): @@ -75,7 +84,7 @@ class LispGPEInnerNSH(Packet): def valid_ipv4(ip): try: - ipaddress.IPv4Address(unicode(ip)) + ipaddress.IPv4Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -83,7 +92,7 @@ def valid_ipv4(ip): def valid_ipv6(ip): try: - ipaddress.IPv6Address(unicode(ip)) + ipaddress.IPv6Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -95,25 +104,28 @@ def main(): :raises RuntimeError: If the received packet is not correct.""" args = TrafficScriptArg( - ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', - 'dut_if2_mac', 'src_rloc', 'dst_rloc'], - ['ot_mode']) - - tx_src_mac = args.get_arg('tg_src_mac') - tx_dst_mac = args.get_arg('dut_if1_mac') - rx_dst_mac = args.get_arg('tg_dst_mac') - rx_src_mac = args.get_arg('dut_if2_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - src_rloc = args.get_arg("src_rloc") - dst_rloc = args.get_arg("dst_rloc") - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - ot_mode = args.get_arg('ot_mode') + [ + u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac", + u"dut_if2_mac", u"src_rloc", u"dst_rloc" + ], + [u"ot_mode"] + ) + + tx_src_mac = args.get_arg(u"tg_src_mac") + tx_dst_mac = args.get_arg(u"dut_if1_mac") + rx_dst_mac = args.get_arg(u"tg_dst_mac") + rx_src_mac = args.get_arg(u"dut_if2_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + src_rloc = args.get_arg(u"src_rloc") + dst_rloc = args.get_arg(u"dst_rloc") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + ot_mode = args.get_arg(u"ot_mode") rxq = RxQueue(rx_if) txq = TxQueue(tx_if) - sent_packets = [] + pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) if valid_ipv4(src_ip) and valid_ipv4(dst_ip): @@ -125,10 +137,12 @@ def main(): pkt_raw /= ICMPv6EchoRequest() ip_format = IPv6 else: - raise ValueError("IP not in correct format") + raise ValueError(u"IP not in correct format") bind_layers(UDP, LispGPEHeader, dport=4341) + pkt_raw /= Raw() + sent_packets = list() sent_packets.append(pkt_raw) txq.send(pkt_raw) @@ -138,59 +152,59 @@ def main(): ether = rxq.recv(2) if ether is None: - raise RuntimeError("ICMP echo Rx timeout") + raise RuntimeError(u"ICMP echo Rx timeout") if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: - print("MAC addresses match.") + print(u"MAC addresses match.") else: - raise RuntimeError( - "Matching packet unsuccessful: {0}".format(ether.__repr__())) + raise RuntimeError(f"Matching packet unsuccessful: {ether!r}") ip = ether.payload - if ot_mode == '6to4': + if ot_mode == u"6to4": if not isinstance(ip, IP): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) - elif ot_mode == '4to6': + raise RuntimeError(f"Not an IP packet received {ip!r}") + elif ot_mode == u"4to6": if not isinstance(ip, IPv6): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") elif not isinstance(ip, ip_format): - raise RuntimeError( - "Not an IP packet received {0}".format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") lisp = ether.getlayer(LispGPEHeader).underlayer if not lisp: - raise RuntimeError("Lisp layer not present or parsing failed.") + raise RuntimeError(u"Lisp layer not present or parsing failed.") # Compare data from packets if src_ip == lisp.src: - print("Source IP matches source EID.") + print(u"Source IP matches source EID.") else: - raise RuntimeError("Matching Src IP unsuccessful: {} != {}" - .format(src_ip, lisp.src)) + raise RuntimeError( + f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}" + ) if dst_ip == lisp.dst: - print("Destination IP matches destination EID.") + print(u"Destination IP matches destination EID.") else: - raise RuntimeError("Matching Dst IP unsuccessful: {} != {}" - .format(dst_ip, lisp.dst)) + raise RuntimeError( + f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}" + ) if src_rloc == ip.src: - print("Source RLOC matches configuration.") + print(u"Source RLOC matches configuration.") else: - raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}" - .format(src_rloc, ip.src)) + raise RuntimeError( + f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}" + ) if dst_rloc == ip.dst: - print("Destination RLOC matches configuration.") + print(u"Destination RLOC matches configuration.") else: - raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}" - .format(dst_rloc, ip.dst)) + raise RuntimeError( + f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}" + ) sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main() diff --git a/resources/traffic_scripts/policer.py b/resources/traffic_scripts/policer.py index 62c397d496..c222c76152 100755 --- a/resources/traffic_scripts/policer.py +++ b/resources/traffic_scripts/policer.py @@ -1,6 +1,6 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 -# Copyright (c) 2016 Cisco and/or its affiliates. +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -18,14 +18,15 @@ import sys import logging +from ipaddress import ip_address # pylint: disable=no-name-in-module # pylint: disable=import-error logging.getLogger("scapy.runtime").setLevel(logging.ERROR) -from scapy.all import Ether +from scapy.layers.l2 import Ether from scapy.layers.inet import IP, TCP from scapy.layers.inet6 import IPv6, ICMPv6ND_NS -from ipaddress import ip_address +from scapy.packet import Raw from resources.libraries.python.TrafficScriptArg import TrafficScriptArg from resources.libraries.python.PacketVerifier import RxQueue, TxQueue @@ -41,17 +42,14 @@ def check_ipv4(pkt_recv, dscp): :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(IP): - raise RuntimeError('Not an IPv4 packet received: {0}'. - format(pkt_recv.__repr__())) + raise RuntimeError(f"Not an IPv4 packet received: {pkt_recv!r}") rx_dscp = pkt_recv[IP].tos >> 2 if rx_dscp != dscp: - raise RuntimeError('Invalid DSCP {0} should be {1}'. - format(rx_dscp, dscp)) + raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}") if not pkt_recv.haslayer(TCP): - raise RuntimeError('Not a TCP packet received: {0}'. - format(pkt_recv.__repr__())) + raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}") def check_ipv6(pkt_recv, dscp): @@ -64,58 +62,48 @@ def check_ipv6(pkt_recv, dscp): :raises RuntimeError: If received packet is invalid. """ if not pkt_recv.haslayer(IPv6): - raise RuntimeError('Not an IPv6 packet received: {0}'. - format(pkt_recv.__repr__())) + raise RuntimeError(f"Not an IPv6 packet received: {pkt_recv!r}") rx_dscp = pkt_recv[IPv6].tc >> 2 if rx_dscp != dscp: - raise RuntimeError('Invalid DSCP {0} should be {1}'. - format(rx_dscp, dscp)) + raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}") if not pkt_recv.haslayer(TCP): - raise RuntimeError('Not a TCP packet received: {0}'. - format(pkt_recv.__repr__())) + raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}") # pylint: disable=too-many-locals # pylint: disable=too-many-statements def main(): """Send and receive TCP packet.""" - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', 'dscp']) - - rxq = RxQueue(args.get_arg('rx_if')) - txq = TxQueue(args.get_arg('tx_if')) + args = TrafficScriptArg( + [u"src_mac", u"dst_mac", u"src_ip", u"dst_ip", u"dscp"] + ) - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - dscp = int(args.get_arg('dscp')) + rxq = RxQueue(args.get_arg(u"rx_if")) + txq = TxQueue(args.get_arg(u"tx_if")) - if 6 == ip_address(unicode(src_ip)).version: - is_ipv4 = False - else: - is_ipv4 = True - - sent_packets = [] + src_mac = args.get_arg(u"src_mac") + dst_mac = args.get_arg(u"dst_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + dscp = int(args.get_arg(u"dscp")) - if is_ipv4: - ip_pkt = (IP(src=src_ip, dst=dst_ip) / - TCP()) - else: - ip_pkt = (IPv6(src=src_ip, dst=dst_ip) / - TCP()) + ip_layer = IPv6 if ip_address(src_ip).version == 6 else IP + sent_packets = list() pkt_send = (Ether(src=src_mac, dst=dst_mac) / - ip_pkt) + ip_layer(src=src_ip, dst=dst_ip) / + TCP()) + pkt_send /= Raw() sent_packets.append(pkt_send) txq.send(pkt_send) while True: pkt_recv = rxq.recv(2, sent_packets) if pkt_recv is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') + raise RuntimeError(u"ICMPv6 echo reply Rx timeout") if pkt_recv.haslayer(ICMPv6ND_NS): # read another packet in the queue if the current one is ICMPv6ND_NS @@ -125,9 +113,9 @@ def main(): break if pkt_recv is None: - raise RuntimeError('Rx timeout') + raise RuntimeError(u"Rx timeout") - if is_ipv4: + if ip_layer == IP: check_ipv4(pkt_recv, dscp) else: check_ipv6(pkt_recv, dscp) @@ -135,5 +123,5 @@ def main(): sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main() diff --git a/resources/traffic_scripts/send_gre_check_gre_headers.py b/resources/traffic_scripts/send_gre_check_gre_headers.py deleted file mode 100755 index 9e2e2abe06..0000000000 --- a/resources/traffic_scripts/send_gre_check_gre_headers.py +++ /dev/null @@ -1,124 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends a UDP encapsulated into GRE packet from one -interface to the other, where GRE encapsulated packet is expected. -""" - -import sys - -from robot.api import logger -from scapy.all import Ether -from scapy.layers.inet import IP_PROTOS -from scapy.layers.inet import IP -from scapy.layers.inet import UDP -from scapy.layers.l2 import GRE - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def main(): - args = TrafficScriptArg( - ['tx_dst_mac', 'tx_src_mac', 'tx_outer_dst_ip', 'tx_outer_src_ip', - 'tx_inner_dst_ip', 'tx_inner_src_ip', 'rx_dst_mac', 'rx_src_mac', - 'rx_outer_dst_ip', 'rx_outer_src_ip']) - - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - tx_dst_mac = args.get_arg('tx_dst_mac') - tx_src_mac = args.get_arg('tx_src_mac') - tx_outer_dst_ip = args.get_arg('tx_outer_dst_ip') - tx_outer_src_ip = args.get_arg('tx_outer_src_ip') - tx_inner_dst_ip = args.get_arg('tx_inner_dst_ip') - tx_inner_src_ip = args.get_arg('tx_inner_src_ip') - rx_dst_mac = args.get_arg('rx_dst_mac') - rx_src_mac = args.get_arg('rx_src_mac') - rx_outer_dst_ip = args.get_arg('rx_outer_dst_ip') - rx_outer_src_ip = args.get_arg('rx_outer_src_ip') - udp_src = 1234 - udp_dst = 2345 - udp_payload = 'udp_payload' - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - - tx_pkt_raw = Ether(dst=tx_dst_mac, src=tx_src_mac) / \ - IP(src=tx_outer_src_ip, dst=tx_outer_dst_ip) / \ - GRE() / \ - IP(src=tx_inner_src_ip, dst=tx_inner_dst_ip) / \ - UDP(dport=udp_dst, sport=udp_src) / \ - udp_payload - - sent_packets.append(tx_pkt_raw) - txq.send(tx_pkt_raw) - ether = rxq.recv(2, ignore=sent_packets) - - if ether is None: - raise RuntimeError("ICMP echo Rx timeout") - - # Check RX headers - if ether.dst != rx_dst_mac: - raise RuntimeError("Matching of received destination MAC unsuccessful.") - logger.debug("Comparison of received destination MAC: OK.") - - if ether.src != rx_src_mac: - raise RuntimeError("Matching of received source MAC unsuccessful.") - logger.debug("Comparison of received source MAC: OK.") - - if ether['IP'].src != rx_outer_src_ip: - raise RuntimeError("Matching of received outer source IP unsuccessful.") - logger.debug("Comparison of received outer source IP: OK.") - - if ether['IP'].dst != rx_outer_dst_ip: - raise RuntimeError( - "Matching of received outer destination IP unsuccessful.") - logger.debug("Comparison of received outer destination IP: OK.") - - if ether['IP'].proto != IP_PROTOS.gre: - raise RuntimeError("IP protocol is not GRE.") - logger.debug("Comparison of received GRE protocol: OK.") - - if ether['IP']['GRE']['IP'].src != tx_inner_src_ip: - raise RuntimeError("Matching of received inner source IP unsuccessful.") - logger.debug("Comparison of received inner source IP: OK.") - - if ether['IP']['GRE']['IP'].dst != tx_inner_dst_ip: - raise RuntimeError( - "Matching of received inner destination IP unsuccessful.") - logger.debug("Comparison of received inner destination IP: OK.") - - # check udp - udp = ether['IP']['GRE']['IP']['UDP'] - if udp.dport != udp_dst: - raise RuntimeError("UDP dport error {} != {}.". - format(udp.dport, udp_dst)) - print "UDP dport: OK." - - if udp.sport != udp_src: - raise RuntimeError("UDP sport error {} != {}.". - format(udp.sport, udp_src)) - print "UDP sport: OK." - - if str(udp.payload) != udp_payload: - raise RuntimeError("UDP payload check unsuccessful {} != {}.". - format(udp.payload, udp_payload)) - print "UDP payload: OK." - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_gre_check_icmp_headers.py b/resources/traffic_scripts/send_gre_check_icmp_headers.py deleted file mode 100755 index 27eac5d786..0000000000 --- a/resources/traffic_scripts/send_gre_check_icmp_headers.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends a GRE encapsulated ICMPv4 packet from one -interface to the other, where is expected ICMPv4 without GRE header. -""" - -import sys - -from robot.api import logger -from scapy.all import Ether -from scapy.layers.inet import IP_PROTOS -from scapy.layers.inet import IP -from scapy.layers.inet import ICMP -from scapy.layers.l2 import GRE - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def main(): - args = TrafficScriptArg( - ['tx_dst_mac', 'rx_dst_mac', - 'inner_src_ip', 'inner_dst_ip', - 'outer_src_ip', 'outer_dst_ip']) - - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - tx_dst_mac = args.get_arg('tx_dst_mac') - rx_dst_mac = args.get_arg('rx_dst_mac') - inner_src_ip = args.get_arg('inner_src_ip') - inner_dst_ip = args.get_arg('inner_dst_ip') - outer_src_ip = args.get_arg('outer_src_ip') - outer_dst_ip = args.get_arg('outer_dst_ip') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - - tx_pkt_raw = Ether(dst=tx_dst_mac) / \ - IP(src=outer_src_ip, dst=outer_dst_ip) / \ - GRE() / \ - IP(src=inner_src_ip, dst=inner_dst_ip) / \ - ICMP() - - sent_packets.append(tx_pkt_raw) - txq.send(tx_pkt_raw) - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError("ICMP echo Rx timeout") - - # Check RX headers - if ether.dst != rx_dst_mac: - raise RuntimeError("Matching of received destination MAC unsuccessful.") - logger.debug("Comparison of received destination MAC: OK.") - - if ether['IP'].src != inner_src_ip: - raise RuntimeError("Matching of received inner source IP unsuccessful.") - logger.debug("Comparison of received outer source IP: OK.") - - if ether['IP'].dst != inner_dst_ip: - raise RuntimeError( - "Matching of received inner destination IP unsuccessful.") - logger.debug("Comparison of received outer destination IP: OK.") - - if ether['IP'].proto != IP_PROTOS.icmp: - raise RuntimeError("IP protocol is other than ICMP.") - logger.debug("Comparison of received ICMP protocol: OK.") - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_icmp_check_arp.py b/resources/traffic_scripts/send_icmp_check_arp.py deleted file mode 100755 index 49d4710ba4..0000000000 --- a/resources/traffic_scripts/send_icmp_check_arp.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an IP ICMP packet -from one interface and expects ARP on the other one. -""" - -import sys -import ipaddress - -from scapy.all import Ether -from scapy.layers.inet import ICMP, IP - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def valid_ipv4(ip): - """Check if IP address has the correct IPv4 address format. - - :param ip: IP address. - :type ip: str - :return: True in case of correct IPv4 address format, - otherwise return False. - :rtype: bool - """ - try: - ipaddress.IPv4Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def main(): - """Send IP ICMP packet from one traffic generator interface and expects - ARP on the other.""" - args = TrafficScriptArg( - ['tx_dst_mac', 'rx_src_mac', 'tx_src_ip', 'tx_dst_ip', 'rx_arp_src_ip', - 'rx_arp_dst_ip']) - - tx_dst_mac = args.get_arg('tx_dst_mac') - rx_src_mac = args.get_arg('rx_src_mac') - src_ip = args.get_arg('tx_src_ip') - dst_ip = args.get_arg('tx_dst_ip') - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - rx_dst_mac = 'ff:ff:ff:ff:ff:ff' - rx_arp_src_ip = args.get_arg('rx_arp_src_ip') - rx_arp_dst_ip = args.get_arg('rx_arp_dst_ip') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - # Create empty IP ICMP packet - if valid_ipv4(src_ip) and valid_ipv4(dst_ip): - pkt_raw = Ether(dst=tx_dst_mac) / IP(src=src_ip, dst=dst_ip) / ICMP() - - # Send created packet on one interface and receive on the other - txq.send(pkt_raw) - - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError("Ethernet frame Rx timeout") - - if ether.dst == rx_dst_mac: - print("Ethernet destination address matched.") - else: - raise RuntimeError( - "Matching ethernet destination address unsuccessful: {0} != {1}". - format(ether.dst, rx_dst_mac)) - - if ether.src == rx_src_mac: - print("Ethernet source address matched.") - else: - raise RuntimeError( - "Matching ethernet source address unsuccessful: {0} != {1}" - .format(ether.src, rx_src_mac)) - - # ARP check - if ether['ARP'] is not None: - print("ARP packet received.") - else: - raise RuntimeError("Not an ARP packet received {0}" - .format(ether.__repr__())) - - # Compare data from packets - if ether['ARP'].op == 1: # 1 - who-has request - print("ARP request matched.") - else: - raise RuntimeError("Matching ARP request unsuccessful: {0} != {1}" - .format(ether['ARP'].op, 1)) - - if ether['ARP'].hwsrc == rx_src_mac: - print("Source MAC matched.") - else: - raise RuntimeError("Matching Source MAC unsuccessful: {0} != {1}" - .format(ether['ARP'].hwsrc, rx_src_mac)) - - if ether['ARP'].hwdst == "00:00:00:00:00:00": - print("Destination MAC matched.") - else: - raise RuntimeError("Matching Destination MAC unsuccessful: {0} != {1}" - .format(ether['ARP'].hwdst, "00:00:00:00:00:00")) - - if ether['ARP'].psrc == rx_arp_src_ip: - print("Source ARP IP address matched.") - else: - raise RuntimeError( - "Matching Source ARP IP address unsuccessful: {0} != {1}" - .format(ether['ARP'].psrc, rx_arp_src_ip)) - - if ether['ARP'].pdst == rx_arp_dst_ip: - print("Destination ARP IP address matched.") - else: - raise RuntimeError( - "Matching Destination ARP IP address unsuccessful: {0} != {1}" - .format(ether['ARP'].pdst, rx_arp_dst_ip)) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_icmp_check_gre_headers.py b/resources/traffic_scripts/send_icmp_check_gre_headers.py deleted file mode 100755 index ccea9f5d9b..0000000000 --- a/resources/traffic_scripts/send_icmp_check_gre_headers.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an ICMPv4 packet from one interface to the other, -where GRE encapsulated packet is expected. -""" - -import sys - -from robot.api import logger -from scapy.all import Ether -from scapy.layers.inet import IP_PROTOS -from scapy.layers.inet import IP -from scapy.layers.inet import ICMP - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def main(): - args = TrafficScriptArg( - ['tx_dst_mac', 'rx_dst_mac', - 'inner_src_ip', 'inner_dst_ip', - 'outer_src_ip', 'outer_dst_ip']) - - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - tx_dst_mac = args.get_arg('tx_dst_mac') - rx_dst_mac = args.get_arg('rx_dst_mac') - inner_src_ip = args.get_arg('inner_src_ip') - inner_dst_ip = args.get_arg('inner_dst_ip') - outer_src_ip = args.get_arg('outer_src_ip') - outer_dst_ip = args.get_arg('outer_dst_ip') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - - tx_pkt_raw = Ether(dst=tx_dst_mac) / \ - IP(src=inner_src_ip, dst=inner_dst_ip) / \ - ICMP() - - sent_packets.append(tx_pkt_raw) - txq.send(tx_pkt_raw) - ether = rxq.recv(2) - - if ether is None: - raise RuntimeError("ICMP echo Rx timeout") - - # Check RX headers - if ether.dst != rx_dst_mac: - raise RuntimeError("Matching of received destination MAC unsuccessful.") - logger.debug("Comparison of received destination MAC: OK.") - - if ether['IP'].src != outer_src_ip: - raise RuntimeError("Matching of received outer source IP unsuccessful.") - logger.debug("Comparison of received outer source IP: OK.") - - if ether['IP'].dst != outer_dst_ip: - raise RuntimeError( - "Matching of received outer destination IP unsuccessful.") - logger.debug("Comparison of received outer destination IP: OK.") - - if ether['IP'].proto != IP_PROTOS.gre: - raise RuntimeError("IP protocol is no GRE.") - logger.debug("Comparison of received GRE protocol: OK.") - - if ether['IP']['GRE']['IP'].src != inner_src_ip: - raise RuntimeError("Matching of received inner source IP unsuccessful.") - logger.debug("Comparison of received inner source IP: OK.") - - if ether['IP']['GRE']['IP'].dst != inner_dst_ip: - raise RuntimeError( - "Matching of received inner destination IP unsuccessful.") - logger.debug("Comparison of received inner destination IP: OK.") - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_icmp_check_headers.py b/resources/traffic_scripts/send_icmp_check_headers.py deleted file mode 100755 index 2193164b61..0000000000 --- a/resources/traffic_scripts/send_icmp_check_headers.py +++ /dev/null @@ -1,153 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface -to the other. Source and destination IP addresses and source and destination -MAC addresses are checked in received packet. -""" - -import sys - -import ipaddress -from robot.api import logger -from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS -from scapy.layers.l2 import Ether, Dot1Q - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def valid_ipv4(ip): - try: - ipaddress.IPv4Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def valid_ipv6(ip): - try: - ipaddress.IPv6Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def main(): - """Send IP ICMP packet from one traffic generator interface to the other.""" - args = TrafficScriptArg( - ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', - 'dut_if2_mac'], - ['encaps_tx', 'vlan_tx', 'vlan_outer_tx', - 'encaps_rx', 'vlan_rx', 'vlan_outer_rx']) - - tx_src_mac = args.get_arg('tg_src_mac') - tx_dst_mac = args.get_arg('dut_if1_mac') - rx_dst_mac = args.get_arg('tg_dst_mac') - rx_src_mac = args.get_arg('dut_if2_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - - encaps_tx = args.get_arg('encaps_tx') - vlan_tx = args.get_arg('vlan_tx') - vlan_outer_tx = args.get_arg('vlan_outer_tx') - encaps_rx = args.get_arg('encaps_rx') - vlan_rx = args.get_arg('vlan_rx') - vlan_outer_rx = args.get_arg('vlan_outer_rx') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - ip_format = '' - pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) - if encaps_tx == 'Dot1q': - pkt_raw /= Dot1Q(vlan=int(vlan_tx)) - elif encaps_tx == 'Dot1ad': - pkt_raw.type = 0x88a8 - pkt_raw /= Dot1Q(vlan=vlan_outer_tx) - pkt_raw /= Dot1Q(vlan=vlan_tx) - if valid_ipv4(src_ip) and valid_ipv4(dst_ip): - pkt_raw /= IP(src=src_ip, dst=dst_ip) - pkt_raw /= ICMP() - ip_format = IP - elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): - pkt_raw /= IPv6(src=src_ip, dst=dst_ip) - pkt_raw /= ICMPv6EchoRequest() - ip_format = IPv6 - else: - raise ValueError("IP not in correct format") - - sent_packets.append(pkt_raw) - txq.send(pkt_raw) - - while True: - if tx_if == rx_if: - ether = rxq.recv(2, ignore=sent_packets) - else: - ether = rxq.recv(2) - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: - logger.trace("MAC matched") - else: - raise RuntimeError("Matching packet unsuccessful: {0}". - format(ether.__repr__())) - - if encaps_rx == 'Dot1q': - if ether[Dot1Q].vlan == int(vlan_rx): - logger.trace("VLAN matched") - else: - raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-' - 'received, {}-expected):\n{}'. - format(ether[Dot1Q].vlan, vlan_rx, - ether.__repr__())) - ip = ether[Dot1Q].payload - elif encaps_rx == 'Dot1ad': - raise NotImplementedError() - else: - ip = ether.payload - - if not isinstance(ip, ip_format): - raise RuntimeError("Not an IP packet received {0}". - format(ip.__repr__())) - - # Compare data from packets - if src_ip == ip.src: - logger.trace("Src IP matched") - else: - raise RuntimeError("Matching Src IP unsuccessful: {} != {}". - format(src_ip, ip.src)) - - if dst_ip == ip.dst: - logger.trace("Dst IP matched") - else: - raise RuntimeError("Matching Dst IP unsuccessful: {} != {}". - format(dst_ip, ip.dst)) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_icmp_check_multipath.py b/resources/traffic_scripts/send_icmp_check_multipath.py deleted file mode 100755 index 5da3b7b096..0000000000 --- a/resources/traffic_scripts/send_icmp_check_multipath.py +++ /dev/null @@ -1,142 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an IP ICMPv4/ICMPv6 packets traffic -and check if it is divided into two paths.""" - -import sys -import ipaddress - -from scapy.all import Ether -from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def valid_ipv4(ip): - try: - ipaddress.IPv4Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def valid_ipv6(ip): - try: - ipaddress.IPv6Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def main(): - """Send 100 IP ICMP packets traffic and check if it is divided into - two paths.""" - args = TrafficScriptArg( - ['src_ip', 'dst_ip', 'tg_if1_mac', 'dut_if1_mac', 'dut_if2_mac', - 'path_1_mac', 'path_2_mac']) - - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - tg_if1_mac = args.get_arg('tg_if1_mac') - dut_if1_mac = args.get_arg('dut_if1_mac') - dut_if2_mac = args.get_arg('dut_if2_mac') - path_1_mac = args.get_arg('path_1_mac') - path_2_mac = args.get_arg('path_2_mac') - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - path_1_counter = 0 - path_2_counter = 0 - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - ip_format = '' - pkt_raw = '' - separator = '' - - if valid_ipv4(src_ip): - separator = '.' - elif valid_ipv6(src_ip): - separator = ':' - else: - raise ValueError("Source address not in correct format") - - src_ip_base = (src_ip.rsplit(separator, 1))[0] + separator - - for i in range(1, 101): - if valid_ipv4(src_ip) and valid_ipv4(dst_ip): - pkt_raw = (Ether(src=tg_if1_mac, dst=dut_if1_mac) / - IP(src=src_ip_base+str(i), dst=dst_ip) / - ICMP()) - ip_format = 'IP' - elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): - pkt_raw = (Ether(src=tg_if1_mac, dst=dut_if1_mac) / - IPv6(src=src_ip_base+str(i), dst=dst_ip) / - ICMPv6EchoRequest()) - ip_format = 'IPv6' - else: - raise ValueError("IP not in correct format") - - sent_packets.append(pkt_raw) - txq.send(pkt_raw) - - while True: - ether = rxq.recv(2) - if ether is None: - raise RuntimeError('ICMPv6 echo reply Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue in case of ICMPv6ND_NS packet - continue - else: - # otherwise process the current packet - break - - if ether is None: - raise RuntimeError("ICMP echo Rx timeout") - if not ether.haslayer(ip_format): - raise RuntimeError("Not an IP packet received {0}". - format(ether.__repr__())) - - if ether[Ether].src != dut_if2_mac: - raise RuntimeError("Source MAC address error") - - if ether[Ether].dst == path_1_mac: - path_1_counter += 1 - elif ether[Ether].dst == path_2_mac: - path_2_counter += 1 - else: - raise RuntimeError("Destination MAC address error") - - if (path_1_counter + path_2_counter) != 100: - raise RuntimeError("Packet loss: recevied only {} packets of 100 ". - format(path_1_counter + path_2_counter)) - - if path_1_counter == 0: - raise RuntimeError("Path 1 error!") - - if path_2_counter == 0: - raise RuntimeError("Path 2 error!") - - print "Path_1 counter: {}".format(path_1_counter) - print "Path_2 counter: {}".format(path_2_counter) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_icmp_type_code.py b/resources/traffic_scripts/send_icmp_type_code.py deleted file mode 100755 index 2997f91264..0000000000 --- a/resources/traffic_scripts/send_icmp_type_code.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2017 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to -the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. -""" - -import sys -import ipaddress - -from scapy.layers.l2 import Ether -from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def valid_ipv4(ip): - """Check if IP address has the correct IPv4 address format. - - :param ip: IP address. - :type ip: str - :returns: True in case of correct IPv4 address format, - otherwise return false. - :rtype: bool - """ - try: - ipaddress.IPv4Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def valid_ipv6(ip): - """Check if IP address has the correct IPv6 address format. - - :param ip: IP address. - :type ip: str - :returns: True in case of correct IPv6 address format, - otherwise return false. - :rtype: bool - """ - try: - ipaddress.IPv6Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def main(): - """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to - the other one.""" - - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip', - 'icmp_type', 'icmp_code']) - - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - - icmp_type = int(args.get_arg('icmp_type')) - icmp_code = int(args.get_arg('icmp_code')) - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - # Create empty ip ICMP packet and add padding before sending - if valid_ipv4(src_ip) and valid_ipv4(dst_ip): - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP(code=icmp_code, type=icmp_type)) - ip_format = IP - icmp_format = ICMP - elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest(code=icmp_code, type=icmp_type)) - ip_format = IPv6 - icmp_format = 'ICMPv6' - else: - raise ValueError("IP(s) not in correct format") - - # Send created packet on one interface and receive on the other - sent_packets.append(pkt_raw) - txq.send(pkt_raw) - - while True: - ether = rxq.recv(2) - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - # Check whether received packet contains layers IP and ICMP - if not ether.haslayer(ip_format): - raise RuntimeError('Not an IP packet received {0}'. - format(ether.__repr__())) - - # Cannot use haslayer for ICMPv6, every type of ICMPv6 is a separate layer - # Next header value of 58 means the next header is ICMPv6 - if not ether.haslayer(icmp_format) and ether[ip_format].nh != 58: - raise RuntimeError('Not an ICMP packet received {0}'. - format(ether.__repr__())) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_icmp_wait_for_reply.py b/resources/traffic_scripts/send_icmp_wait_for_reply.py index 7677152801..2c79ae7136 100755 --- a/resources/traffic_scripts/send_icmp_wait_for_reply.py +++ b/resources/traffic_scripts/send_icmp_wait_for_reply.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 + # Copyright (c) 2016 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,56 +18,16 @@ import sys import ipaddress -from scapy.all import Ether from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\ + ICMPv6ND_NS +from scapy.layers.l2 import Ether +from scapy.packet import Raw from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg -def is_icmp_reply(pkt, ipformat): - """Return True if pkt is echo reply, else return False. If exception occurs - return False. - - :param pkt: Packet. - :param ipformat: Dictionary of names to distinguish IPv4 and IPv6. - :type pkt: dict - :type ipformat: dict - :rtype: bool - """ - # pylint: disable=bare-except - try: - if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \ - ipformat['Type']: - return True - else: - return False - except: # pylint: disable=bare-except - return False - - -def address_check(request, reply, ipformat): - """Compare request packet source address with reply destination address - and vice versa. If exception occurs return False. - - :param request: Sent packet containing request. - :param reply: Received packet containing reply. - :param ipformat: Dictionary of names to distinguish IPv4 and IPv6. - :type request: dict - :type reply: dict - :type ipformat: dict - :rtype: bool - """ - # pylint: disable=bare-except - try: - r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst - r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src - return r_src and r_dst - except: # pylint: disable=bare-except - return False - - def valid_ipv4(ip): """Check if IP address has the correct IPv4 address format. @@ -77,7 +38,7 @@ def valid_ipv4(ip): :rtype: bool """ try: - ipaddress.IPv4Address(unicode(ip)) + ipaddress.IPv4Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -93,7 +54,7 @@ def valid_ipv6(ip): :rtype: bool """ try: - ipaddress.IPv6Address(unicode(ip)) + ipaddress.IPv6Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -102,16 +63,17 @@ def valid_ipv6(ip): def main(): """Send ICMP echo request and wait for ICMP echo reply. It ignores all other packets.""" - args = TrafficScriptArg(['dst_mac', 'src_mac', 'dst_ip', 'src_ip', - 'timeout']) - - dst_mac = args.get_arg('dst_mac') - src_mac = args.get_arg('src_mac') - dst_ip = args.get_arg('dst_ip') - src_ip = args.get_arg('src_ip') - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - timeout = int(args.get_arg('timeout')) + args = TrafficScriptArg( + [u"dst_mac", u"src_mac", u"dst_ip", u"src_ip", u"timeout"] + ) + + dst_mac = args.get_arg(u"dst_mac") + src_mac = args.get_arg(u"src_mac") + dst_ip = args.get_arg(u"dst_ip") + src_ip = args.get_arg(u"src_ip") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + timeout = int(args.get_arg(u"timeout")) wait_step = 1 rxq = RxQueue(rx_if) @@ -120,21 +82,26 @@ def main(): # Create empty ip ICMP packet if valid_ipv4(src_ip) and valid_ipv4(dst_ip): - icmp_request = (Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP()) - ip_format = {'IPType': 'IP', 'ICMP_req': 'ICMP', - 'ICMP_rep': 'ICMP', 'Type': 0} + ip_layer = IP + icmp_req = ICMP + icmp_resp = ICMP + icmp_type = 0 # echo-reply elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): - icmp_request = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest()) - ip_format = {'IPType': 'IPv6', 'ICMP_req': 'ICMPv6 Echo Request', - 'ICMP_rep': 'ICMPv6 Echo Reply', 'Type': 129} + ip_layer = IP + icmp_req = ICMPv6EchoRequest + icmp_resp = ICMPv6EchoReply + icmp_type = 0 # Echo Reply else: - raise ValueError("IP not in correct format") + raise ValueError(u"IP not in correct format") + + icmp_request = ( + Ether(src=src_mac, dst=dst_mac) / + ip_layer(src=src_ip, dst=dst_ip) / + icmp_req() + ) # Send created packet on the interface + icmp_request /= Raw() sent_packets.append(icmp_request) txq.send(icmp_request) @@ -144,7 +111,7 @@ def main(): if icmp_reply is None: timeout -= wait_step if timeout < 0: - raise RuntimeError("ICMP echo Rx timeout") + raise RuntimeError(u"ICMP echo Rx timeout") elif icmp_reply.haslayer(ICMPv6ND_NS): # read another packet in the queue in case of ICMPv6ND_NS packet @@ -153,16 +120,17 @@ def main(): # otherwise process the current packet break - if is_icmp_reply(icmp_reply, ip_format): - if address_check(icmp_request, icmp_reply, ip_format): + if icmp_reply[ip_layer][icmp_resp].type == icmp_type: + if icmp_reply[ip_layer].src == dst_ip and \ + icmp_reply[ip_layer].dst == src_ip: break else: - raise RuntimeError("Max packet count limit reached") + raise RuntimeError(u"Max packet count limit reached") - print "ICMP echo reply received." + print(u"ICMP echo reply received.") sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main() diff --git a/resources/traffic_scripts/send_ip_check_headers.py b/resources/traffic_scripts/send_ip_check_headers.py index 7c4f2bd002..f5a55553cf 100755 --- a/resources/traffic_scripts/send_ip_check_headers.py +++ b/resources/traffic_scripts/send_ip_check_headers.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 + # Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,10 +21,12 @@ MAC addresses are checked in received packet. import sys import ipaddress + from robot.api import logger from scapy.layers.inet import IP from scapy.layers.inet6 import IPv6, ICMPv6ND_NS from scapy.layers.l2 import Ether, Dot1Q +from scapy.packet import Raw from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg @@ -31,7 +34,7 @@ from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def valid_ipv4(ip): try: - ipaddress.IPv4Address(unicode(ip)) + ipaddress.IPv4Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -39,7 +42,7 @@ def valid_ipv4(ip): def valid_ipv6(ip): try: - ipaddress.IPv6Address(unicode(ip)) + ipaddress.IPv6Address(ip) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -48,38 +51,45 @@ def valid_ipv6(ip): def main(): """Send IP/IPv6 packet from one traffic generator interface to the other.""" args = TrafficScriptArg( - ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', - 'dut_if2_mac'], - ['encaps_tx', 'vlan_tx', 'vlan_outer_tx', - 'encaps_rx', 'vlan_rx', 'vlan_outer_rx']) - - tx_src_mac = args.get_arg('tg_src_mac') - tx_dst_mac = args.get_arg('dut_if1_mac') - rx_dst_mac = args.get_arg('tg_dst_mac') - rx_src_mac = args.get_arg('dut_if2_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - - encaps_tx = args.get_arg('encaps_tx') - vlan_tx = args.get_arg('vlan_tx') - vlan_outer_tx = args.get_arg('vlan_outer_tx') - encaps_rx = args.get_arg('encaps_rx') - vlan_rx = args.get_arg('vlan_rx') - vlan_outer_rx = args.get_arg('vlan_outer_rx') + [ + u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac", + u"dut_if2_mac" + ], + [ + u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx", + u"vlan_rx", u"vlan_outer_rx" + ] + ) + + tx_src_mac = args.get_arg(u"tg_src_mac") + tx_dst_mac = args.get_arg(u"dut_if1_mac") + rx_dst_mac = args.get_arg(u"tg_dst_mac") + rx_src_mac = args.get_arg(u"dut_if2_mac") + src_ip = args.get_arg(u"src_ip") + dst_ip = args.get_arg(u"dst_ip") + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + + encaps_tx = args.get_arg(u"encaps_tx") + vlan_tx = args.get_arg(u"vlan_tx") + vlan_outer_tx = args.get_arg(u"vlan_outer_tx") + encaps_rx = args.get_arg(u"encaps_rx") + vlan_rx = args.get_arg(u"vlan_rx") + vlan_outer_rx = args.get_arg(u"vlan_outer_rx") rxq = RxQueue(rx_if) txq = TxQueue(tx_if) - sent_packets = [] - ip_format = '' + + sent_packets =list() pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) - if encaps_tx == 'Dot1q': + + if encaps_tx == u"Dot1q": pkt_raw /= Dot1Q(vlan=int(vlan_tx)) - elif encaps_tx == 'Dot1ad': + elif encaps_tx == u"Dot1ad": pkt_raw.type = 0x88a8 pkt_raw /= Dot1Q(vlan=vlan_outer_tx) pkt_raw /= Dot1Q(vlan=vlan_tx) + if valid_ipv4(src_ip) and valid_ipv4(dst_ip): pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61) ip_format = IP @@ -87,8 +97,9 @@ def main(): pkt_raw /= IPv6(src=src_ip, dst=dst_ip) ip_format = IPv6 else: - raise ValueError("IP not in correct format") + raise ValueError(u"IP not in correct format") + pkt_raw /= Raw() sent_packets.append(pkt_raw) txq.send(pkt_raw) @@ -99,7 +110,7 @@ def main(): ether = rxq.recv(2) if ether is None: - raise RuntimeError('IP packet Rx timeout') + raise RuntimeError(u"IP packet Rx timeout") if ether.haslayer(ICMPv6ND_NS): # read another packet in the queue if the current one is ICMPv6ND_NS @@ -109,44 +120,45 @@ def main(): break if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src: - logger.trace("MAC matched") + logger.trace(u"MAC matched") else: - raise RuntimeError("Matching packet unsuccessful: {0}". - format(ether.__repr__())) + raise RuntimeError(f"Matching packet unsuccessful: {ether!r}") - if encaps_rx == 'Dot1q': + if encaps_rx == u"Dot1q": if ether[Dot1Q].vlan == int(vlan_rx): - logger.trace("VLAN matched") + logger.trace(u"VLAN matched") else: - raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-' - 'received, {}-expected):\n{}'. - format(ether[Dot1Q].vlan, vlan_rx, - ether.__repr__())) + raise RuntimeError( + f"Ethernet frame with wrong VLAN tag " + f"({ether[Dot1Q].vlan}-received, " + f"{vlan_rx}-expected):\n{ether!r}" + ) ip = ether[Dot1Q].payload - elif encaps_rx == 'Dot1ad': + elif encaps_rx == u"Dot1ad": raise NotImplementedError() else: ip = ether.payload if not isinstance(ip, ip_format): - raise RuntimeError("Not an IP packet received {0}". - format(ip.__repr__())) + raise RuntimeError(f"Not an IP packet received {ip!r}") # Compare data from packets if src_ip == ip.src: - logger.trace("Src IP matched") + logger.trace(u"Src IP matched") else: - raise RuntimeError("Matching Src IP unsuccessful: {} != {}". - format(src_ip, ip.src)) + raise RuntimeError( + f"Matching Src IP unsuccessful: {src_ip} != {ip.src}" + ) if dst_ip == ip.dst: - logger.trace("Dst IP matched") + logger.trace(u"Dst IP matched") else: - raise RuntimeError("Matching Dst IP unsuccessful: {} != {}". - format(dst_ip, ip.dst)) + raise RuntimeError( + f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}" + ) sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main() diff --git a/resources/traffic_scripts/send_ip_icmp.py b/resources/traffic_scripts/send_ip_icmp.py deleted file mode 100755 index 58f2e1e4d8..0000000000 --- a/resources/traffic_scripts/send_ip_icmp.py +++ /dev/null @@ -1,186 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to -the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. -""" - -import sys -import ipaddress - -from scapy.layers.l2 import Ether, Dot1Q -from scapy.layers.inet import ICMP, IP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def valid_ipv4(ip): - """Check if IP address has the correct IPv4 address format. - - :param ip: IP address. - :type ip: str - :return: True in case of correct IPv4 address format, - otherwise return false. - :rtype: bool - """ - try: - ipaddress.IPv4Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def valid_ipv6(ip): - """Check if IP address has the correct IPv6 address format. - - :param ip: IP address. - :type ip: str - :return: True in case of correct IPv6 address format, - otherwise return false. - :rtype: bool - """ - try: - ipaddress.IPv6Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def main(): - """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to - the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. - """ - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip'], - ['encaps', 'vlan1', 'vlan2', 'encaps_rx', - 'vlan1_rx', 'vlan2_rx']) - - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - - encaps = args.get_arg('encaps') - vlan1 = args.get_arg('vlan1') - vlan2 = args.get_arg('vlan2') - encaps_rx = args.get_arg('encaps_rx') - vlan1_rx = args.get_arg('vlan1_rx') - vlan2_rx = args.get_arg('vlan2_rx') - - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - ip_format = '' - icmp_format = '' - # Create empty ip ICMP packet and add padding before sending - if valid_ipv4(src_ip) and valid_ipv4(dst_ip): - if encaps == 'Dot1q': - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - Dot1Q(vlan=int(vlan1)) / - IP(src=src_ip, dst=dst_ip) / - ICMP()) - elif encaps == 'Dot1ad': - pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) / - Dot1Q(vlan=int(vlan1), type=0x8100) / - Dot1Q(vlan=int(vlan2)) / - IP(src=src_ip, dst=dst_ip) / - ICMP()) - else: - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP()) - ip_format = IP - icmp_format = ICMP - elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): - if encaps == 'Dot1q': - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - Dot1Q(vlan=int(vlan1)) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest()) - elif encaps == 'Dot1ad': - pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) / - Dot1Q(vlan=int(vlan1), type=0x8100) / - Dot1Q(vlan=int(vlan2)) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest()) - else: - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest()) - ip_format = IPv6 - icmp_format = ICMPv6EchoRequest - else: - raise ValueError("IP(s) not in correct format") - - # Send created packet on one interface and receive on the other - sent_packets.append(pkt_raw) - txq.send(pkt_raw) - - # Receive ICMP / ICMPv6 echo reply - while True: - ether = rxq.recv(2,) - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - # Check whether received packet contains layers IP/IPv6 and - # ICMP/ICMPv6EchoRequest - if encaps_rx: - if encaps_rx == 'Dot1q': - if not vlan1_rx: - vlan1_rx = vlan1 - if not ether.haslayer(Dot1Q): - raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'. - format(ether.__repr__())) - elif ether[Dot1Q].vlan != int(vlan1_rx): - raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) ' - 'received ({} expected):\n{}'. - format(ether[Dot1Q].vlan, vlan1_rx, - ether.__repr__())) - elif encaps_rx == 'Dot1ad': - if not vlan1_rx: - vlan1_rx = vlan1 - if not vlan2_rx: - vlan2_rx = vlan2 - # TODO - raise RuntimeError('Encapsulation {0} not implemented yet.'. - format(encaps_rx)) - else: - raise RuntimeError('Unsupported encapsulation expected: {0}'. - format(encaps_rx)) - - if not ether.haslayer(ip_format): - raise RuntimeError('Not an IP/IPv6 packet received:\n{0}'. - format(ether.__repr__())) - - if not ether.haslayer(icmp_format): - raise RuntimeError('Not an ICMP/ICMPv6EchoRequest packet received:\n' - '{0}'.format(ether.__repr__())) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_ipv4_icmp_check_lw_4o6.py b/resources/traffic_scripts/send_ipv4_icmp_check_lw_4o6.py deleted file mode 100755 index 9e225428df..0000000000 --- a/resources/traffic_scripts/send_ipv4_icmp_check_lw_4o6.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an IPv4 ICMP packet with ID field and checks if -IPv4 is correctly encapsulated into IPv6 packet. Doing for ICMP echo request -and ICMP echo response packet.""" - -import sys - -from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, ICMP, icmptypes - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def _is_ipv4_in_ipv6(pkt): - """If IPv6 next header type in the given pkt is IPv4, return True, - else return False. False is returned also if exception occurs.""" - ipv6_type = int('0x86dd', 16) # IPv6 - try: - if pkt.type == ipv6_type: - if pkt.payload.nh == 4: - return True - except: # pylint: disable=bare-except - return False - return False - - -def main(): # pylint: disable=too-many-statements, too-many-locals - """Main function of the script file.""" - args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4', - 'tx_icmp_id', 'rx_dst_mac', 'rx_src_mac', - 'src_ipv6', 'dst_ipv6']) - rx_if = args.get_arg('rx_if') - tx_if = args.get_arg('tx_if') - tx_dst_mac = args.get_arg('tx_dst_mac') - tx_src_ipv4 = args.get_arg('tx_src_ipv4') - tx_dst_ipv4 = args.get_arg('tx_dst_ipv4') - tx_icmp_id = int(args.get_arg('tx_icmp_id')) - rx_dst_mac = args.get_arg('rx_dst_mac') - rx_src_mac = args.get_arg('rx_src_mac') - rx_src_ipv6 = args.get_arg('src_ipv6') - rx_dst_ipv6 = args.get_arg('dst_ipv6') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - - for icmp_type in ('echo-request', 'echo-reply'): - print '\nChecking ICMP type: {}'.format(icmp_type) - - # Create ICMP request - tx_pkt = (Ether(dst=tx_dst_mac) / - IP(src=tx_src_ipv4, dst=tx_dst_ipv4) / - ICMP(type=icmp_type, id=tx_icmp_id)) - - txq.send(tx_pkt) - sent_packets.append(tx_pkt) - - for _ in range(5): - pkt = rxq.recv(2) - if _is_ipv4_in_ipv6(pkt): - ether = pkt - break - else: - raise RuntimeError("IPv4 in IPv6 Rx error.") - - # check ethernet - if ether.dst != rx_dst_mac: - raise RuntimeError("Destination MAC error {} != {}.". - format(ether.dst, rx_dst_mac)) - print "Destination MAC: OK." - - if ether.src != rx_src_mac: - raise RuntimeError("Source MAC error {} != {}.". - format(ether.src, rx_src_mac)) - print "Source MAC: OK." - - ipv6 = ether.payload - - # check ipv6 - if ipv6.dst != rx_dst_ipv6: - raise RuntimeError("Destination IP error {} != {}.". - format(ipv6.dst, rx_dst_ipv6)) - print "Destination IPv6: OK." - - if ipv6.src != rx_src_ipv6: - raise RuntimeError("Source IP error {} != {}.". - format(ipv6.src, rx_src_ipv6)) - print "Source IPv6: OK." - - ipv4 = ipv6.payload - - # check ipv4 - if ipv4.dst != tx_dst_ipv4: - raise RuntimeError("Destination IP error {} != {}.". - format(ipv4.dst, tx_dst_ipv4)) - print "Destination IPv4: OK." - - if ipv4.src != tx_src_ipv4: - raise RuntimeError("Source IP error {} != {}.". - format(ipv4.src, tx_src_ipv4)) - print "Source IPv4: OK." - - # check icmp echo request - if ipv4.proto != 1: # ICMP - raise RuntimeError("IP protocol error {} != ICMP.". - format(ipv4.proto)) - print "IPv4 protocol: OK." - - icmp = ipv4.payload - - # check icmp - if icmptypes[icmp.type] != icmp_type: - raise RuntimeError("ICMP type error {} != echo request.". - format(icmp.type)) - print "ICMP type: OK." - - if icmp.id != tx_icmp_id: - raise RuntimeError("ICMP ID error {} != {}.". - format(icmp.id, tx_icmp_id)) - print "ICMP ID: OK." - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_ipv4_udp_check_lw_4o6.py b/resources/traffic_scripts/send_ipv4_udp_check_lw_4o6.py deleted file mode 100755 index 116edb206b..0000000000 --- a/resources/traffic_scripts/send_ipv4_udp_check_lw_4o6.py +++ /dev/null @@ -1,136 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an empty UDP datagram and checks if IPv4 is -correctly encapsulated into IPv6 packet.""" - -import sys - -from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP -from ipaddress import ip_address - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def _is_ipv4_in_ipv6(pkt): - """If IPv6 next header type in the given pkt is IPv4, return True, - else return False. False is returned also if exception occurs.""" - ipv6_type = int('0x86dd', 16) # IPv6 - try: - if pkt.type == ipv6_type: - if pkt.payload.nh == 4: - return True - except: # pylint: disable=bare-except - return False - return False - - -def main(): # pylint: disable=too-many-statements, too-many-locals - """Main function of the script file.""" - args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4', - 'tx_dst_udp_port', 'rx_dst_mac', 'rx_src_mac', - 'src_ipv6', 'dst_ipv6']) - rx_if = args.get_arg('rx_if') - tx_if = args.get_arg('tx_if') - tx_dst_mac = args.get_arg('tx_dst_mac') - tx_src_ipv4 = args.get_arg('tx_src_ipv4') - tx_dst_ipv4 = args.get_arg('tx_dst_ipv4') - tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port')) - tx_src_udp_port = 20000 - rx_dst_mac = args.get_arg('rx_dst_mac') - rx_src_mac = args.get_arg('rx_src_mac') - rx_src_ipv6 = args.get_arg('src_ipv6') - rx_dst_ipv6 = args.get_arg('dst_ipv6') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - - # Create empty UDP datagram - udp = (Ether(dst=tx_dst_mac) / - IP(src=tx_src_ipv4, dst=tx_dst_ipv4) / - UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port)) - - txq.send(udp) - sent_packets.append(udp) - - for _ in range(5): - pkt = rxq.recv(2) - if _is_ipv4_in_ipv6(pkt): - ether = pkt - break - else: - raise RuntimeError("IPv4 in IPv6 Rx error.") - - # check ethernet - if ether.dst != rx_dst_mac: - raise RuntimeError("Destination MAC error {} != {}.". - format(ether.dst, rx_dst_mac)) - print "Destination MAC: OK." - - if ether.src != rx_src_mac: - raise RuntimeError("Source MAC error {} != {}.". - format(ether.src, rx_src_mac)) - print "Source MAC: OK." - - ipv6 = ether.payload - - # check ipv6 - if ip_address(unicode(ipv6.dst)) != ip_address(unicode(rx_dst_ipv6)): - raise RuntimeError("Destination IP error {} != {}.". - format(ipv6.dst, rx_dst_ipv6)) - print "Destination IPv6: OK." - - if ip_address(unicode(ipv6.src)) != ip_address(unicode(rx_src_ipv6)): - raise RuntimeError("Source IP error {} != {}.". - format(ipv6.src, rx_src_ipv6)) - print "Source IPv6: OK." - - ipv4 = ipv6.payload - - # check ipv4 - if ipv4.dst != tx_dst_ipv4: - raise RuntimeError("Destination IP error {} != {}.". - format(ipv4.dst, tx_dst_ipv4)) - print "Destination IPv4: OK." - - if ipv4.src != tx_src_ipv4: - raise RuntimeError("Source IP error {} != {}.". - format(ipv4.src, tx_src_ipv4)) - print "Source IPv4: OK." - - if ipv4.proto != 17: # UDP - raise RuntimeError("IP protocol error {} != UDP.". - format(ipv4.proto)) - print "IPv4 protocol: OK." - - udp = ipv4.payload - - # check udp - if udp.dport != tx_dst_udp_port: - raise RuntimeError("UDP dport error {} != {}.". - format(udp.dport, tx_dst_udp_port)) - print "UDP dport: OK." - - if udp.sport != tx_src_udp_port: - raise RuntimeError("UDP sport error {} != {}.". - format(udp.sport, tx_src_udp_port)) - print "UDP sport: OK." - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_ipv4_udp_check_map_t.py b/resources/traffic_scripts/send_ipv4_udp_check_map_t.py deleted file mode 100755 index 835f667751..0000000000 --- a/resources/traffic_scripts/send_ipv4_udp_check_map_t.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends a UDP datagram and checks if IPv4 addresses -are correctly translate to IPv6 addresses.""" - -import sys - -from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP -from ipaddress import ip_address - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def _check_udp_checksum(pkt): - """Check UDP checksum in IP packet. Return True if checksum is correct - else False.""" - new = pkt.__class__(str(pkt)) - del new['UDP'].chksum - new = new.__class__(str(new)) - return new['UDP'].chksum == pkt['UDP'].chksum - - -def _is_udp_in_ipv6(pkt): - """If IPv6 next header type in the given pkt is UDP, return True, - else return False. False is returned also if exception occurs.""" - ipv6_type = int('0x86dd', 16) # IPv6 - try: - if pkt.type == ipv6_type: - if pkt.payload.nh == 17: # UDP - return True - except AttributeError: - return False - return False - - -def main(): # pylint: disable=too-many-statements, too-many-locals - """Main function of the script file.""" - args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4', - 'tx_dst_udp_port', 'rx_dst_mac', 'rx_src_mac', - 'rx_src_ipv6', 'rx_dst_ipv6']) - rx_if = args.get_arg('rx_if') - tx_if = args.get_arg('tx_if') - tx_dst_mac = args.get_arg('tx_dst_mac') - tx_src_ipv4 = args.get_arg('tx_src_ipv4') - tx_dst_ipv4 = args.get_arg('tx_dst_ipv4') - tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port')) - tx_src_udp_port = 20000 - rx_dst_mac = args.get_arg('rx_dst_mac') - rx_src_mac = args.get_arg('rx_src_mac') - rx_src_ipv6 = args.get_arg('rx_src_ipv6') - rx_dst_ipv6 = args.get_arg('rx_dst_ipv6') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - - # Create empty UDP datagram - udp = (Ether(dst=tx_dst_mac) / - IP(src=tx_src_ipv4, dst=tx_dst_ipv4) / - UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) / - 'udp_payload') - - txq.send(udp) - sent_packets.append(udp) - - for _ in range(5): - pkt = rxq.recv(2) - if _is_udp_in_ipv6(pkt): - ether = pkt - break - else: - raise RuntimeError("UDP in IPv6 Rx error.") - - # check ethernet - if ether.dst != rx_dst_mac: - raise RuntimeError("Destination MAC error {} != {}.". - format(ether.dst, rx_dst_mac)) - print "Destination MAC: OK." - - if ether.src != rx_src_mac: - raise RuntimeError("Source MAC error {} != {}.". - format(ether.src, rx_src_mac)) - print "Source MAC: OK." - - ipv6 = ether.payload - - # check ipv6 - if ip_address(unicode(ipv6.dst)) != ip_address(unicode(rx_dst_ipv6)): - raise RuntimeError("Destination IP error {} != {}.". - format(ipv6.dst, rx_dst_ipv6)) - print "Destination IPv6: OK." - - if ip_address(unicode(ipv6.src)) != ip_address(unicode(rx_src_ipv6)): - raise RuntimeError("Source IP error {} != {}.". - format(ipv6.src, rx_src_ipv6)) - print "Source IPv6: OK." - - udp = ipv6.payload - - # check udp - if udp.dport != tx_dst_udp_port: - raise RuntimeError("UDP dport error {} != {}.". - format(udp.dport, tx_dst_udp_port)) - print "UDP dport: OK." - - if udp.sport != tx_src_udp_port: - raise RuntimeError("UDP sport error {} != {}.". - format(udp.sport, tx_src_udp_port)) - print "UDP sport: OK." - - if not _check_udp_checksum(ipv6): - raise RuntimeError("UDP checksum error.") - print "UDP checksum OK." - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_ipv6_udp_check_map_t.py b/resources/traffic_scripts/send_ipv6_udp_check_map_t.py deleted file mode 100755 index 2d9c291686..0000000000 --- a/resources/traffic_scripts/send_ipv6_udp_check_map_t.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an empty UDP datagram and checks if IPv6 addresses -are correctly translate to IPv4 addresses.""" - -import sys - -from scapy.layers.l2 import Ether -from scapy.layers.inet6 import IPv6, UDP -from ipaddress import ip_address - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def _check_udp_checksum(pkt): - """Check UDP checksum in IP packet. Return True if checksum is correct - else False.""" - new = pkt.__class__(str(pkt)) - del new['UDP'].chksum - new = new.__class__(str(new)) - return new['UDP'].chksum == pkt['UDP'].chksum - - -def _is_udp_in_ipv4(pkt): - """If IPv4 protocol type in the given pkt is UDP, return True, - else return False. False is returned also if exception occurs.""" - ipv4_type = int('0x0800', 16) # IPv4 - try: - if pkt.type == ipv4_type: - if pkt.payload.proto == 17: # UDP - return True - except AttributeError: - return False - return False - - -def main(): # pylint: disable=too-many-statements, too-many-locals - """Main function of the script file.""" - args = TrafficScriptArg(['tx_dst_mac', 'tx_src_mac', - 'tx_src_ipv6', 'tx_dst_ipv6', - 'tx_src_udp_port', 'rx_dst_mac', 'rx_src_mac', - 'rx_src_ipv4', 'rx_dst_ipv4']) - rx_if = args.get_arg('rx_if') - tx_if = args.get_arg('tx_if') - tx_dst_mac = args.get_arg('tx_dst_mac') - tx_src_mac = args.get_arg('tx_src_mac') - tx_src_ipv6 = args.get_arg('tx_src_ipv6') - tx_dst_ipv6 = args.get_arg('tx_dst_ipv6') - tx_src_udp_port = int(args.get_arg('tx_src_udp_port')) - tx_dst_udp_port = 20000 - rx_dst_mac = args.get_arg('rx_dst_mac') - rx_src_mac = args.get_arg('rx_src_mac') - rx_src_ipv4 = args.get_arg('rx_src_ipv4') - rx_dst_ipv4 = args.get_arg('rx_dst_ipv4') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - - # Create empty UDP datagram in IPv6 - - udp = (Ether(dst=tx_dst_mac, src=tx_src_mac) / - IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) / - UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) / - 'udp_payload') - - txq.send(udp) - sent_packets.append(udp) - - for _ in range(5): - pkt = rxq.recv(2) - if _is_udp_in_ipv4(pkt): - ether = pkt - break - else: - raise RuntimeError("UDP in IPv4 Rx error.") - - # check ethernet - if ether.dst != rx_dst_mac: - raise RuntimeError("Destination MAC error {} != {}.". - format(ether.dst, rx_dst_mac)) - print "Destination MAC: OK." - - if ether.src != rx_src_mac: - raise RuntimeError("Source MAC error {} != {}.". - format(ether.src, rx_src_mac)) - print "Source MAC: OK." - - ipv4 = ether.payload - - # check ipv4 - if ip_address(unicode(ipv4.dst)) != ip_address(unicode(rx_dst_ipv4)): - raise RuntimeError("Destination IPv4 error {} != {}.". - format(ipv4.dst, rx_dst_ipv4)) - print "Destination IPv4: OK." - - if ip_address(unicode(ipv4.src)) != ip_address(unicode(rx_src_ipv4)): - raise RuntimeError("Source IPv4 error {} != {}.". - format(ipv4.src, rx_src_ipv4)) - print "Source IPv4: OK." - - udp = ipv4.payload - - # check udp - if udp.dport != tx_dst_udp_port: - raise RuntimeError("UDP dport error {} != {}.". - format(udp.dport, tx_dst_udp_port)) - print "UDP dport: OK." - - if udp.sport != tx_src_udp_port: - raise RuntimeError("UDP sport error {} != {}.". - format(udp.sport, tx_src_udp_port)) - print "UDP sport: OK." - - if not _check_udp_checksum(ipv4): - raise RuntimeError("UDP checksum error.") - print "UDP checksum OK." - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_lw_4o6_check_hairpinning_udp.py b/resources/traffic_scripts/send_lw_4o6_check_hairpinning_udp.py deleted file mode 100755 index eb5c26ea61..0000000000 --- a/resources/traffic_scripts/send_lw_4o6_check_hairpinning_udp.py +++ /dev/null @@ -1,145 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6 -and checks if is correctly re-encapsulated.""" - -import sys - -from scapy.layers.l2 import Ether -from scapy.layers.inet6 import IPv6 -from scapy.layers.inet import IP, UDP - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def _is_ipv4_in_ipv6(pkt): - """If IPv6 next header type in the given pkt is IPv4, return True, - else return False. False is returned also if exception occurs.""" - ipv6_type = int('0x86dd', 16) # IPv6 - try: - if pkt.type == ipv6_type: - if pkt.payload.nh == 4: - return True - except: # pylint: disable=bare-except - return False - return False - - -def main(): # pylint: disable=too-many-statements, too-many-locals - """Main function of the script file.""" - args = TrafficScriptArg(['tx_dst_mac', - 'tx_dst_ipv6', 'tx_src_ipv6', - 'tx_dst_ipv4', 'tx_src_ipv4', - 'tx_dst_udp_port', 'tx_src_udp_port', - 'rx_dst_mac', 'rx_src_mac', - 'rx_dst_ipv6', 'rx_src_ipv6']) - rx_if = args.get_arg('rx_if') - tx_if = args.get_arg('tx_if') - tx_dst_mac = args.get_arg('tx_dst_mac') - tx_src_mac = '02:00:00:00:00:01' - - tx_dst_ipv6 = args.get_arg('tx_dst_ipv6') - tx_src_ipv6 = args.get_arg('tx_src_ipv6') - tx_dst_ipv4 = args.get_arg('tx_dst_ipv4') - tx_src_ipv4 = args.get_arg('tx_src_ipv4') - tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port')) - tx_src_udp_port = int(args.get_arg('tx_src_udp_port')) - rx_dst_mac = args.get_arg('rx_dst_mac') - rx_src_mac = args.get_arg('rx_src_mac') - rx_dst_ipv6 = args.get_arg('rx_dst_ipv6') - rx_src_ipv6 = args.get_arg('rx_src_ipv6') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - - # Create empty UDP datagram in IPv4 and IPv6 - tx_pkt = Ether(dst=tx_dst_mac, src=tx_src_mac) - tx_pkt /= IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) - tx_pkt /= IP(src=tx_src_ipv4, dst=tx_dst_ipv4) - tx_pkt /= UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) - tx_pkt /= 'udp_payload' - - txq.send(tx_pkt) - sent_packets.append(tx_pkt) - - for _ in range(5): - pkt = rxq.recv(2, ignore=sent_packets) - if _is_ipv4_in_ipv6(pkt): - ether = pkt - break - else: - raise RuntimeError("IPv4 in IPv6 Rx error.") - - # check ethernet - if ether.dst != rx_dst_mac: - raise RuntimeError("Destination MAC error {} != {}.". - format(ether.dst, rx_dst_mac)) - print "Destination MAC: OK." - - if ether.src != rx_src_mac: - raise RuntimeError("Source MAC error {} != {}.". - format(ether.src, rx_src_mac)) - print "Source MAC: OK." - - ipv6 = ether.payload - - # check ipv6 - if ipv6.dst != rx_dst_ipv6: - raise RuntimeError("Destination IPv6 error {} != {}.". - format(ipv6.dst, rx_dst_ipv6)) - print "Destination IPv6: OK." - - if ipv6.src != rx_src_ipv6: - raise RuntimeError("Source IPv6 error {} != {}.". - format(ipv6.src, rx_src_ipv6)) - print "Source IPv6: OK." - - ipv4 = ipv6.payload - - # check ipv4 - if ipv4.dst != tx_dst_ipv4: - raise RuntimeError("Destination IPv4 error {} != {}.". - format(ipv4.dst, tx_dst_ipv4)) - print "Destination IPv4: OK." - - if ipv4.src != tx_src_ipv4: - raise RuntimeError("Source IPv4 error {} != {}.". - format(ipv4.src, tx_src_ipv4)) - print "Source IPv4: OK." - - if ipv4.proto != 17: # UDP - raise RuntimeError("IPv4 protocol error {} != UDP.". - format(ipv4.proto)) - print "IPv4 protocol: OK." - - udp = ipv4.payload - - # check udp - if udp.dport != tx_dst_udp_port: - raise RuntimeError("UDP dport error {} != {}.". - format(udp.dport, tx_dst_udp_port)) - print "UDP dport: OK." - - if udp.sport != tx_src_udp_port: - raise RuntimeError("UDP sport error {} != {}.". - format(udp.sport, tx_src_udp_port)) - print "UDP sport: OK." - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_lw_4o6_check_ipv4_udp.py b/resources/traffic_scripts/send_lw_4o6_check_ipv4_udp.py deleted file mode 100755 index fcb745a407..0000000000 --- a/resources/traffic_scripts/send_lw_4o6_check_ipv4_udp.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6 -and checks if is correctly decapsulated.""" - -import sys - -from scapy.layers.l2 import Ether -from scapy.layers.inet6 import IPv6 -from scapy.layers.inet import IP, UDP - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def _is_udp_in_ipv4(pkt): - """If UDP is in IPv4 packet return True, - else return False. False is returned also if exception occurs.""" - ipv4_type = int('0x0800', 16) # IPv4 - try: - if pkt.type == ipv4_type: - if pkt.payload.proto == 17: # UDP - return True - except: # pylint: disable=bare-except - return False - return False - - -def main(): # pylint: disable=too-many-statements, too-many-locals - """Main function of the script file.""" - args = TrafficScriptArg(['tx_dst_mac', 'tx_src_mac', - 'tx_dst_ipv6', 'tx_src_ipv6', - 'tx_dst_ipv4', 'tx_src_ipv4', 'tx_src_udp_port', - 'rx_dst_mac', 'rx_src_mac']) - rx_if = args.get_arg('rx_if') - tx_if = args.get_arg('tx_if') - tx_src_mac = args.get_arg('tx_src_mac') - tx_dst_mac = args.get_arg('tx_dst_mac') - tx_dst_ipv6 = args.get_arg('tx_dst_ipv6') - tx_src_ipv6 = args.get_arg('tx_src_ipv6') - tx_dst_ipv4 = args.get_arg('tx_dst_ipv4') - tx_src_ipv4 = args.get_arg('tx_src_ipv4') - tx_src_udp_port = int(args.get_arg('tx_src_udp_port')) - tx_dst_udp_port = 20000 - rx_dst_mac = args.get_arg('rx_dst_mac') - rx_src_mac = args.get_arg('rx_src_mac') - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - sent_packets = [] - - # Create empty UDP datagram in IPv4 and IPv6 - tx_pkt = (Ether(dst=tx_dst_mac, src=tx_src_mac) / - IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) / - IP(src=tx_src_ipv4, dst=tx_dst_ipv4) / - UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port)) - - txq.send(tx_pkt) - sent_packets.append(tx_pkt) - - for _ in range(5): - pkt = rxq.recv(2) - if _is_udp_in_ipv4(pkt): - ether = pkt - break - else: - raise RuntimeError("UDP in IPv4 Rx error.") - - # check ethernet - if ether.dst != rx_dst_mac: - raise RuntimeError("Destination MAC error {} != {}.". - format(ether.dst, rx_dst_mac)) - print "Destination MAC: OK." - - if ether.src != rx_src_mac: - raise RuntimeError("Source MAC error {} != {}.". - format(ether.src, rx_src_mac)) - print "Source MAC: OK." - - ipv4 = ether.payload - - # check ipv4 - if ipv4.dst != tx_dst_ipv4: - raise RuntimeError("Destination IPv4 error {} != {}.". - format(ipv4.dst, tx_dst_ipv4)) - print "Destination IPv4: OK." - - if ipv4.src != tx_src_ipv4: - raise RuntimeError("Source IPv4 error {} != {}.". - format(ipv4.src, tx_src_ipv4)) - print "Source IPv4: OK." - - if ipv4.proto != 17: # UDP - raise RuntimeError("IPv4 protocol error {} != UDP.". - format(ipv4.proto)) - print "IPv4 protocol: OK." - - udp = ipv4.payload - - # check udp - if udp.dport != tx_dst_udp_port: - raise RuntimeError("UDP dport error {} != {}.". - format(udp.dport, tx_dst_udp_port)) - print "UDP dport: OK." - - if udp.sport != tx_src_udp_port: - raise RuntimeError("UDP sport error {} != {}.". - format(udp.sport, tx_src_udp_port)) - print "UDP sport: OK." - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_rs_check_ra.py b/resources/traffic_scripts/send_rs_check_ra.py deleted file mode 100755 index c9ff46528a..0000000000 --- a/resources/traffic_scripts/send_rs_check_ra.py +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Router solicitation check script.""" - -import sys -import ipaddress - -from scapy.layers.l2 import Ether -from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_RS, ICMPv6ND_NS - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def mac_to_ipv6_linklocal(mac): - """Transfer MAC address into specific link-local IPv6 address. - - :param mac: MAC address to be transferred. - :type mac: str - :return: IPv6 link-local address. - :rtype: str - """ - # Remove the most common delimiters: dots, dashes, etc. - mac_value = int(mac.translate(None, ' .:-'), 16) - - # Split out the bytes that slot into the IPv6 address - # XOR the most significant byte with 0x02, inverting the - # Universal / Local bit - high2 = mac_value >> 32 & 0xffff ^ 0x0200 - high1 = mac_value >> 24 & 0xff - low1 = mac_value >> 16 & 0xff - low2 = mac_value & 0xffff - - return 'fe80::{:04x}:{:02x}ff:fe{:02x}:{:04x}'.format( - high2, high1, low1, low2) - - -def main(): - """Send Router Solicitation packet, check if the received response\ - is a Router Advertisement packet and verify.""" - - args = TrafficScriptArg( - ['src_mac', 'dst_mac', 'src_ip'] - ) - - router_mac = args.get_arg('dst_mac') - src_mac = args.get_arg('src_mac') - src_ip = args.get_arg('src_ip') - if not src_ip: - src_ip = mac_to_ipv6_linklocal(src_mac) - tx_if = args.get_arg('tx_if') - - txq = TxQueue(tx_if) - rxq = RxQueue(tx_if) - - pkt_raw = (Ether(src=src_mac, dst='33:33:00:00:00:02') / - IPv6(src=src_ip, dst='ff02::2') / - ICMPv6ND_RS()) - - sent_packets = [pkt_raw] - txq.send(pkt_raw) - - while True: - ether = rxq.recv(2, sent_packets) - if ether is None: - raise RuntimeError('ICMP echo Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - # Check whether received packet contains layer RA and check other values - if ether.src != router_mac: - raise RuntimeError('Packet source MAC ({0}) does not match router MAC ' - '({1}).'.format(ether.src, router_mac)) - if ether.dst != src_mac: - raise RuntimeError('Packet destination MAC ({0}) does not match RS ' - 'source MAC ({1}).'.format(ether.dst, src_mac)) - - if not ether.haslayer(ICMPv6ND_RA): - raise RuntimeError('Not an RA packet received {0}'. - format(ether.__repr__())) - - src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src)) - dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst)) - router_link_local = ipaddress.IPv6Address(unicode( - mac_to_ipv6_linklocal(router_mac))) - rs_src_address = ipaddress.IPv6Address(unicode(src_ip)) - - if src_address != router_link_local: - raise RuntimeError('Packet source address ({0}) does not match link ' - 'local address({1})'. - format(src_address, router_link_local)) - - if dst_address != rs_src_address: - raise RuntimeError('Packet destination address ({0}) does not match ' - 'RS source address ({1}).'. - format(dst_address, rs_src_address)) - - if ether['IPv6'].hlim != 255: - raise RuntimeError('Hop limit not correct: {0}!=255'. - format(ether['IPv6'].hlim)) - - ra_code = ether[ICMPv6ND_RA].code - if ra_code != 0: - raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code)) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_tcp_udp.py b/resources/traffic_scripts/send_tcp_udp.py deleted file mode 100755 index 4cba73286a..0000000000 --- a/resources/traffic_scripts/send_tcp_udp.py +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an TCP or UDP packet -from one interface to the other. -""" - -import sys -import ipaddress - -from scapy.all import Ether -from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6, ICMPv6ND_NS - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def valid_ipv4(ip): - """Check if IP address has the correct IPv4 address format. - - :param ip: IP address. - :type ip: str - :return: True in case of correct IPv4 address format, - otherwise return False. - :rtype: bool - """ - try: - ipaddress.IPv4Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def valid_ipv6(ip): - """Check if IP address has the correct IPv6 address format. - - :param ip: IP address. - :type ip: str - :return: True in case of correct IPv6 address format, - otherwise return False. - :rtype: bool - """ - try: - ipaddress.IPv6Address(unicode(ip)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def main(): - """Send TCP or UDP packet from one traffic generator interface to the other. - """ - args = TrafficScriptArg(['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol', - 'source_port', 'destination_port']) - - src_mac = args.get_arg('tx_mac') - dst_mac = args.get_arg('rx_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - - protocol = args.get_arg('protocol') - source_port = args.get_arg('source_port') - destination_port = args.get_arg('destination_port') - - if valid_ipv4(src_ip) and valid_ipv4(dst_ip): - ip_version = IP - elif valid_ipv6(src_ip) and valid_ipv6(dst_ip): - ip_version = IPv6 - else: - ValueError("Invalid IP version!") - - if protocol.upper() == 'TCP': - protocol = TCP - elif protocol.upper() == 'UDP': - protocol = UDP - else: - raise ValueError("Invalid protocol type!") - - rxq = RxQueue(rx_if) - txq = TxQueue(tx_if) - - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - ip_version(src=src_ip, dst=dst_ip) / - protocol(sport=int(source_port), dport=int(destination_port))) - - txq.send(pkt_raw) - - while True: - ether = rxq.recv(2) - if ether is None: - raise RuntimeError('TCP/UDP Rx timeout') - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - if TCP in ether: - print ("TCP packet received.") - - elif UDP in ether: - print ("UDP packet received.") - else: - raise RuntimeError("Not an TCP or UDP packet received {0}". - format(ether.__repr__())) - - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/send_vxlan_check_vxlan.py b/resources/traffic_scripts/send_vxlan_check_vxlan.py index 3bf273d17a..162703d60c 100755 --- a/resources/traffic_scripts/send_vxlan_check_vxlan.py +++ b/resources/traffic_scripts/send_vxlan_check_vxlan.py @@ -1,5 +1,6 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. +#!/usr/bin/env python3 + +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -18,42 +19,46 @@ the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. import sys -import vxlan - -from scapy.layers.inet import IP, UDP, Raw +from scapy.layers.inet import IP, UDP from scapy.layers.l2 import Ether +from scapy.packet import Raw + from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg +from resources.traffic_scripts import vxlan def main(): """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set. """ - args = TrafficScriptArg(['tx_src_mac', 'tx_dst_mac', 'tx_src_ip', - 'tx_dst_ip', 'tx_vni', 'rx_src_ip', 'rx_dst_ip', - 'rx_vni']) - - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - tx_src_mac = args.get_arg('tx_src_mac') - tx_dst_mac = args.get_arg('tx_dst_mac') - tx_src_ip = args.get_arg('tx_src_ip') - tx_dst_ip = args.get_arg('tx_dst_ip') - tx_vni = args.get_arg('tx_vni') - rx_src_ip = args.get_arg('rx_src_ip') - rx_dst_ip = args.get_arg('rx_dst_ip') - rx_vni = args.get_arg('rx_vni') + args = TrafficScriptArg( + [ + u"tx_src_mac", u"tx_dst_mac", u"tx_src_ip", u"tx_dst_ip", u"tx_vni", + u"rx_src_ip", u"rx_dst_ip", u"rx_vni" + ] + ) + + tx_if = args.get_arg(u"tx_if") + rx_if = args.get_arg(u"rx_if") + tx_src_mac = args.get_arg(u"tx_src_mac") + tx_dst_mac = args.get_arg(u"tx_dst_mac") + tx_src_ip = args.get_arg(u"tx_src_ip") + tx_dst_ip = args.get_arg(u"tx_dst_ip") + tx_vni = args.get_arg(u"tx_vni") + rx_src_ip = args.get_arg(u"rx_src_ip") + rx_dst_ip = args.get_arg(u"rx_dst_ip") + rx_vni = args.get_arg(u"rx_vni") rxq = RxQueue(rx_if) txq = TxQueue(tx_if) sent_packets = [] - tx_pkt_p = (Ether(src='02:00:00:00:00:01', dst='02:00:00:00:00:02') / - IP(src='192.168.1.1', dst='192.168.1.2') / + tx_pkt_p = (Ether(src=u"02:00:00:00:00:01", dst=u"02:00:00:00:00:02") / + IP(src=u"192.168.1.1", dst=u"192.168.1.2") / UDP(sport=12345, dport=1234) / - Raw('rew data')) + Raw(u"raw data")) pkt_raw = (Ether(src=tx_src_mac, dst=tx_dst_mac) / IP(src=tx_src_ip, dst=tx_dst_ip) / @@ -61,6 +66,7 @@ def main(): vxlan.VXLAN(vni=int(tx_vni)) / tx_pkt_p) + pkt_raw /= Raw() # Send created packet on one interface and receive on the other sent_packets.append(pkt_raw) txq.send(pkt_raw) @@ -69,38 +75,45 @@ def main(): # Check whether received packet contains layers Ether, IP and VXLAN if ether is None: - raise RuntimeError('Packet Rx timeout') + raise RuntimeError(u"Packet Rx timeout") ip = ether.payload if ip.src != rx_src_ip: - raise RuntimeError('IP src mismatch {} != {}'.format(ip.src, rx_src_ip)) + raise RuntimeError(f"IP src mismatch {ip.src} != {rx_src_ip}") if ip.dst != rx_dst_ip: - raise RuntimeError('IP dst mismatch {} != {}'.format(ip.dst, rx_dst_ip)) + raise RuntimeError(f"IP dst mismatch {ip.dst} != {rx_dst_ip}") if ip.payload.dport != 4789: - raise RuntimeError('VXLAN UDP port mismatch {} != {}'. - format(ip.payload.dport, 4789)) + raise RuntimeError( + f"VXLAN UDP port mismatch {ip.payload.dport} != 4789" + ) vxlan_pkt = ip.payload.payload if int(vxlan_pkt.vni) != int(rx_vni): - raise RuntimeError('vxlan mismatch') + raise RuntimeError(u"vxlan mismatch") rx_pkt_p = vxlan_pkt.payload if rx_pkt_p.src != tx_pkt_p.src: - raise RuntimeError('RX encapsulated MAC src mismatch {} != {}'. - format(rx_pkt_p.src, tx_pkt_p.src)) + raise RuntimeError( + f"RX encapsulated MAC src mismatch {rx_pkt_p.src} != {tx_pkt_p.src}" + ) if rx_pkt_p.dst != tx_pkt_p.dst: - raise RuntimeError('RX encapsulated MAC dst mismatch {} != {}'. - format(rx_pkt_p.dst, tx_pkt_p.dst)) - if rx_pkt_p['IP'].src != tx_pkt_p['IP'].src: - raise RuntimeError('RX encapsulated IP src mismatch {} != {}'. - format(rx_pkt_p['IP'].src, tx_pkt_p['IP'].src)) - if rx_pkt_p['IP'].dst != tx_pkt_p['IP'].dst: - raise RuntimeError('RX encapsulated IP dst mismatch {} != {}'. - format(rx_pkt_p['IP'].dst, tx_pkt_p['IP'].dst)) + raise RuntimeError( + f"RX encapsulated MAC dst mismatch {rx_pkt_p.dst} != {tx_pkt_p.dst}" + ) + if rx_pkt_p[IP].src != tx_pkt_p[IP].src: + raise RuntimeError( + f"RX encapsulated IP src mismatch {rx_pkt_p[IP].src} != " + f"{tx_pkt_p[IP].src}" + ) + if rx_pkt_p[IP].dst != tx_pkt_p[IP].dst: + raise RuntimeError( + f"RX encapsulated IP dst mismatch {rx_pkt_p[IP].dst} != " + f"{tx_pkt_p[IP].dst}" + ) # TODO: verify inner Ether() sys.exit(0) -if __name__ == "__main__": +if __name__ == u"__main__": main() diff --git a/resources/traffic_scripts/vxlan.py b/resources/traffic_scripts/vxlan.py index bf86f179a8..eebfb9056e 100644 --- a/resources/traffic_scripts/vxlan.py +++ b/resources/traffic_scripts/vxlan.py @@ -1,17 +1,19 @@ from scapy.fields import BitField, XByteField, X3BytesField -from scapy.packet import Packet, bind_layers -from scapy.layers.l2 import Ether from scapy.layers.inet import UDP +from scapy.layers.l2 import Ether +from scapy.packet import Packet, bind_layers class VXLAN(Packet): - name = "VXLAN" - fields_desc = [BitField("flags", 0x08000000, 32), - X3BytesField("vni", 0), - XByteField("reserved", 0x00)] + name = u"VXLAN" + fields_desc = [ + BitField(u"flags", 0x08000000, 32), + X3BytesField(u"vni", 0), + XByteField(u"reserved", 0x00) + ] def mysummary(self): - return self.sprintf("VXLAN (vni=%VXLAN.vni%)") + return self.sprintf(f"VXLAN (vni={VXLAN.vni})") bind_layers(UDP, VXLAN, dport=4789) bind_layers(VXLAN, Ether) |