diff options
Diffstat (limited to 'resources')
-rw-r--r-- | resources/libraries/python/MacSwap.py | 61 | ||||
-rw-r--r-- | resources/templates/vat/macswap.vat | 1 | ||||
-rw-r--r-- | resources/templates/vat/macswap_exec.vat | 1 | ||||
-rwxr-xr-x | resources/traffic_scripts/macswap_check.py | 66 | ||||
-rwxr-xr-x | resources/traffic_scripts/span_check.py | 215 |
5 files changed, 0 insertions, 344 deletions
diff --git a/resources/libraries/python/MacSwap.py b/resources/libraries/python/MacSwap.py deleted file mode 100644 index ebcf95eae2..0000000000 --- a/resources/libraries/python/MacSwap.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright (c) 2018 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Macswap sample_plugin util library""" - -from resources.libraries.python.topology import NodeType, Topology -from resources.libraries.python.VatExecutor import VatExecutor - -class MacSwap(object): - """Macswap sample plugin API""" - - @staticmethod - def enable_disable_macswap_vat(node, interface): - """Enable/Disable macswap on interface. - - Function can be used on a VPP node with macswap plugin. - - :param node: Node where the interface is. - :param interface: Interface id. - :type node: dict - :type interface: str or int - :returns: nothing - """ - if node['type'] == NodeType.DUT: - sw_if_index = Topology.get_interface_sw_index(node, interface) - - VatExecutor.cmd_from_template(node, 'macswap.vat', - sw_if_index=sw_if_index) - else: - raise ValueError('Node {} has not DUT NodeType: "{}"'. - format(node['host'], node['type'])) - - @staticmethod - def enable_disable_macswap_vat_exec(node, interface_name): - """Enable/Disable macswap on interface. - - Function can be used on a VPP node with macswap plugin. - - :param node: Node where the interface is. - :param interface_name: Interface name. - :type node: dict - :type interface_name: str or int - :returns: nothing - """ - if node['type'] == NodeType.DUT: - - VatExecutor.cmd_from_template(node, 'macswap_exec.vat', - if_name=interface_name) - else: - raise ValueError('Node {} has not DUT NodeType: "{}"'. - format(node['host'], node['type'])) diff --git a/resources/templates/vat/macswap.vat b/resources/templates/vat/macswap.vat deleted file mode 100644 index 39a6bc5ec3..0000000000 --- a/resources/templates/vat/macswap.vat +++ /dev/null @@ -1 +0,0 @@ -sample_macswap_enable_disable sw_if_index {sw_if_index} diff --git a/resources/templates/vat/macswap_exec.vat b/resources/templates/vat/macswap_exec.vat deleted file mode 100644 index 7b12cc6365..0000000000 --- a/resources/templates/vat/macswap_exec.vat +++ /dev/null @@ -1 +0,0 @@ -exec sample macswap {if_name} diff --git a/resources/traffic_scripts/macswap_check.py b/resources/traffic_scripts/macswap_check.py deleted file mode 100755 index 402bb7bac1..0000000000 --- a/resources/traffic_scripts/macswap_check.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Macswap test.""" - -from scapy.all import Ether, IP - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - -def check_macswap(pkt_send, pkt_recv): - """Compare MAC addresses of sent and received packet. - - :param pkt_send: Sent packet. - :param pkt_recv: Received packet. - :type pkt_send: Ether - :type pkt_recv: Ether - """ - print "Comparing following packets:" - pkt_send.show2() - pkt_recv.show2() - - if pkt_send.dst != pkt_recv.src or pkt_send.src != pkt_recv.dst: - raise RuntimeError("MAC addresses were not swapped.") - - -def main(): - """Main function.""" - args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip']) - - rxq = RxQueue(args.get_arg('rx_if')) - txq = TxQueue(args.get_arg('tx_if')) - - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - - pkt_send = (Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip, proto=61)) - - sent_packets = [] - sent_packets.append(pkt_send) - txq.send(pkt_send) - - pkt_recv = rxq.recv(ignore=sent_packets) - - if pkt_recv is None: - raise RuntimeError('Timeout waiting for packet') - - check_macswap(pkt_send, pkt_recv) - -if __name__ == "__main__": - main() diff --git a/resources/traffic_scripts/span_check.py b/resources/traffic_scripts/span_check.py deleted file mode 100755 index cbf65d3aaf..0000000000 --- a/resources/traffic_scripts/span_check.py +++ /dev/null @@ -1,215 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface -to the other. Source and destination IP addresses and source and destination -MAC addresses are checked in received packet. -""" - -import sys -import ipaddress - -from scapy.layers.inet import IP, ICMP, ARP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS -from scapy.layers.l2 import Ether - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def valid_ipv4(address): - """Check if IP address has the correct IPv4 address format. - - :param address: IP address. - :type address: str - :return: True in case of correct IPv4 address format, - otherwise return false. - :rtype: bool - """ - try: - ipaddress.IPv4Address(unicode(address)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def valid_ipv6(address): - """Check if IP address has the correct IPv6 address format. - - :param address: IP address. - :type address: str - :return: True in case of correct IPv6 address format, - otherwise return false. - :rtype: bool - """ - try: - ipaddress.IPv6Address(unicode(address)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def main(): - """Send a simple L2 or ICMP packet from one TG interface to DUT, then - receive a copy of the packet on the second TG interface, and a copy of - the ICMP reply.""" - args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', - 'ptype']) - - src_mac = args.get_arg('tg_src_mac') - dst_mac = args.get_arg('dut_if1_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - ptype = args.get_arg('ptype') - - rxq_mirrored = RxQueue(rx_if) - rxq_tx = RxQueue(tx_if) - txq = TxQueue(tx_if) - - sent = [] - - if ptype == "ARP": - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - ARP(hwsrc=src_mac, hwdst="00:00:00:00:00:00", - psrc=src_ip, pdst=dst_ip, op="who-has")) - elif ptype == "ICMP": - if valid_ipv4(src_ip) and valid_ipv4(dst_ip): - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP(type="echo-request")) - else: - raise ValueError("IP addresses not in correct format") - elif ptype == "ICMPv6": - if valid_ipv6(src_ip) and valid_ipv6(dst_ip): - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest()) - else: - raise ValueError("IPv6 addresses not in correct format") - else: - raise RuntimeError("Unexpected payload type.") - - txq.send(pkt_raw) - sent.append(auto_pad(pkt_raw)) - - # Receive copy of Rx packet. - while True: - ether = rxq_mirrored.recv(2) - if ether is None: - raise RuntimeError("Rx timeout of mirrored Rx packet") - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - pkt = auto_pad(pkt_raw) - if str(ether) != str(pkt): - print("Mirrored Rx packet doesn't match the original Rx packet.") - if ether.src != src_mac or ether.dst != dst_mac: - raise RuntimeError("MAC mismatch in mirrored Rx packet.") - if ptype == "ARP": - if not ether.haslayer(ARP): - raise RuntimeError("Mirrored Rx packet is not an ARP packet.") - if ether['ARP'].op != 1: # 1=who-has - raise RuntimeError("Mirrored Rx packet is not an ARP request.") - if ether['ARP'].hwsrc != src_mac or ether['ARP'].hwdst != dst_mac: - raise RuntimeError("MAC mismatch in mirrored Rx ARP packet.") - if ether['ARP'].psrc != src_ip or ether['ARP'].pdst != dst_ip: - raise RuntimeError("IP address mismatch in mirrored " - "Rx ARP packet.") - elif ptype == "ICMP": - if not ether.haslayer(IP): - raise RuntimeError("Mirrored Rx packet is not an IPv4 packet.") - if ether['IP'].src != src_ip or ether['IP'].dst != dst_ip: - raise RuntimeError("IP address mismatch in mirrored " - "Rx IPv4 packet.") - if not ether.haslayer(ICMP): - raise RuntimeError("Mirrored Rx packet is not an ICMP packet.") - if ether['ICMP'].type != 8: # 8=echo-request - raise RuntimeError("Mirrored Rx packet is not an ICMP " - "echo request.") - elif ptype == "ICMPv6": - if not ether.haslayer(IPv6): - raise RuntimeError("Mirrored Rx packet is not an IPv6 packet.") - if ether['IPv6'].src != src_ip or ether['IPv6'].dst != dst_ip: - raise RuntimeError("IP address mismatch in mirrored " - "Rx IPv6 packet.") - if not ether.haslayer(ICMPv6EchoRequest): - raise RuntimeError("Mirrored Rx packet is not an ICMPv6 " - "echo request.") - print("Mirrored Rx packet check OK.\n") - - # Receive reply on TG Tx port. - ether_repl = rxq_tx.recv(2, sent) - - if ether_repl is None: - raise RuntimeError("Reply not received on TG Tx port.") - - print("Reply received on TG Tx port.\n") - - # Receive copy of Tx packet. - ether = rxq_mirrored.recv(2) - if ether is None: - raise RuntimeError("Rx timeout of mirrored Tx packet") - - if str(ether) != str(ether_repl): - print("Mirrored Tx packet doesn't match the received Tx packet.") - if ether.src != ether_repl.src or ether.dst != ether_repl.dst: - raise RuntimeError("MAC mismatch in mirrored Tx packet.") - if ptype == "ARP": - if not ether.haslayer(ARP): - raise RuntimeError("Mirrored Tx packet is not an ARP packet.") - if ether['ARP'].op != ether_repl['ARP'].op: # 2=is_at - raise RuntimeError("ARP operational code mismatch " - "in mirrored Tx packet.") - if ether['ARP'].hwsrc != ether_repl['ARP'].hwsrc\ - or ether['ARP'].hwdst != ether_repl['ARP'].hwdst: - raise RuntimeError("MAC mismatch in mirrored Tx ARP packet.") - if ether['ARP'].psrc != ether_repl['ARP'].psrc\ - or ether['ARP'].pdst != ether_repl['ARP'].pdst: - raise RuntimeError("IP address mismatch in mirrored " - "Tx ARP packet.") - elif ptype == "ICMP": - if not ether.haslayer(IP): - raise RuntimeError("Mirrored Tx packet is not an IPv4 packet.") - if ether['IP'].src != ether_repl['IP'].src\ - or ether['IP'].dst != ether_repl['IP'].dst: - raise RuntimeError("IP address mismatch in mirrored " - "Tx IPv4 packet.") - if not ether.haslayer(ICMP): - raise RuntimeError("Mirrored Tx packet is not an ICMP packet.") - if ether['ICMP'].type != ether_repl['ICMP'].type: # 0=echo-reply - raise RuntimeError("ICMP packet type mismatch " - "in mirrored Tx packet.") - elif ptype == "ICMPv6": - if not ether.haslayer(IPv6): - raise RuntimeError("Mirrored Tx packet is not an IPv6 packet.") - if ether['IPv6'].src != ether_repl['IPv6'].src\ - or ether['IPv6'].dst != ether_repl['IPv6'].dst: - raise RuntimeError("IP address mismatch in mirrored " - "Tx IPv6 packet.") - if ether[2].name != ether_repl[2].name: - raise RuntimeError("ICMPv6 message type mismatch " - "in mirrored Tx packet.") - print("Mirrored Tx packet check OK.\n") - sys.exit(0) - - -if __name__ == "__main__": - main() |