#!/usr/bin/env python import socket import unittest import struct from framework import VppTestCase, VppTestRunner, running_extended_tests from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6 from scapy.layers.l2 import Ether, ARP, GRE from scapy.data import IP_PROTOS from scapy.packet import bind_layers from util import ppp from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder from time import sleep class MethodHolder(VppTestCase): """ NAT create capture and verify method holder """ @classmethod def setUpClass(cls): super(MethodHolder, cls).setUpClass() def tearDown(self): super(MethodHolder, self).tearDown() def check_ip_checksum(self, pkt): """ Check IP checksum of the packet :param pkt: Packet to check IP checksum """ new = pkt.__class__(str(pkt)) del new['IP'].chksum new = new.__class__(str(new)) self.assertEqual(new['IP'].chksum, pkt['IP'].chksum) def check_tcp_checksum(self, pkt): """ Check TCP checksum in IP packet :param pkt: Packet to check TCP checksum """ new = pkt.__class__(str(pkt)) del new['TCP'].chksum new = new.__class__(str(new)) self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum) def check_udp_checksum(self, pkt): """ Check UDP checksum in IP packet :param pkt: Packet to check UDP checksum """ new = pkt.__class__(str(pkt)) del new['UDP'].chksum new = new.__class__(str(new)) self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum) def check_icmp_errror_embedded(self, pkt): """ Check ICMP error embeded packet checksum :param pkt: Packet to check ICMP error embeded packet checksum """ if pkt.haslayer(IPerror): new = pkt.__class__(str(pkt)) del new['IPerror'].chksum new = new.__class__(str(new)) self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum) if pkt.haslayer(TCPerror): new = pkt.__class__(str(pkt)) del new['TCPerror'].chksum new = new.__class__(str(new)) self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum) if pkt.haslayer(UDPerror): if pkt['UDPerror'].chksum != 0: new = pkt.__class__(str(pkt)) del new['UDPerror'].chksum new = new.__class__(str(new)) self.assertEqual(new['UDPerror'].chksum, pkt['UDPerror'].chksum) if pkt.haslayer(ICMPerror): del new['ICMPerror'].chksum new = new.__class__(str(new)) self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum) def check_icmp_checksum(self, pkt): """ Check ICMP checksum in IPv4 packet :param pkt: Packet to check ICMP checksum """ new = pkt.__class__(str(pkt)) del new['ICMP'].chksum new = new.__class__(str(new)) self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum) if pkt.haslayer(IPerror): self.check_icmp_errror_embedded(pkt) def check_icmpv6_checksum(self, pkt): """ Check ICMPv6 checksum in IPv4 packet :param pkt: Packet to check ICMPv6 checksum """ new = pkt.__class__(str(pkt)) if pkt.haslayer(ICMPv6DestUnreach): del new['ICMPv6DestUnreach'].cksum new = new.__class__(str(new)) self.assertEqual(new['ICMPv6DestUnreach'].cksum, pkt['ICMPv6DestUnreach'].cksum) self.check_icmp_errror_embedded(pkt) if pkt.haslayer(ICMPv6EchoRequest): del new['ICMPv6EchoRequest'].cksum new = new.__class__(str(new)) self.assertEqual(new['ICMPv6EchoRequest'].cksum, pkt['ICMPv6EchoRequest'].cksum) if pkt.haslayer(ICMPv6EchoReply): del new['ICMPv6EchoReply'].cksum new = new.__class__(str(new)) self.assertEqual(new['ICMPv6EchoReply'].cksum, pkt['ICMPv6EchoReply'].cksum) def create_stream_in(self, in_if, out_if, ttl=64): """ Create packet stream for inside network :param in_if: Inside interface :param out_if: Outside interface :param ttl: TTL of generated packets """ pkts = [] # TCP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / TCP(sport=self.tcp_port_in, dport=20)) pkts.append(p) # UDP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / UDP(sport=self.udp_port_in, dport=20)) pkts.append(p) # ICMP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / ICMP(id=self.icmp_id_in, type='echo-request')) pkts.append(p) return pkts def compose_ip6(self, ip4, pref, plen): """ Compose IPv4-embedded IPv6 addresses :param ip4: IPv4 address :param pref: IPv6 prefix :param plen: IPv6 prefix length :returns: IPv4-embedded IPv6 addresses """ pref_n = list(socket.inet_pton(socket.AF_INET6, pref)) ip4_n = list(socket.inet_pton(socket.AF_INET, ip4)) if plen == 32: pref_n[4] = ip4_n[0] pref_n[5] = ip4_n[1] pref_n[6] = ip4_n[2] pref_n[7] = ip4_n[3] elif plen == 40: pref_n[5] = ip4_n[0] pref_n[6] = ip4_n[1] pref_n[7] = ip4_n[2] pref_n[9] = ip4_n[3] elif plen == 48: pref_n[6] = ip4_n[0] pref_n[7] = ip4_n[1] pref_n[9] = ip4_n[2] pref_n[10] = ip4_n[3] elif plen == 56: pref_n[7] = ip4
# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if(NOT CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64.*|AARCH64.*)")
return()
endif()
find_path(MUSDK_INCLUDE_DIR NAMES mv_std.h)
find_library(MUSDK_LIB NAMES libmusdk.a)
if(MUSDK_INCLUDE_DIR AND MUSDK_LIB)
get_filename_component(MUSDK_LIB_DIR ${MUSDK_LIB} DIRECTORY)
set(MUSDK_LINK_FLAGS "-Wl,--whole-archive,${MUSDK_LIB_DIR}/libmusdk.a,--no-whole-archive")
add_vpp_plugin(marvell
SOURCES
plugin.c
pp2/cli.c
pp2/format.c
pp2/input.c
pp2/output.c
pp2/pp2.c
pp2/pp2_api.c
API_FILES
pp2/pp2.api
API_TEST_SOURCES
pp2/pp2_test.c
LINK_FLAGS
${MUSDK_LINK_FLAGS}
)
include_directories(${MUSDK_INCLUDE_DIR})
message(STATUS "Found Marvell MUSDK in ${MUSDK_INCLUDE_DIR}")
else()
message(WARNING "Marvell MUSDK not found - marvell_plugin disabled")
endif()