diff options
Diffstat (limited to 'test/asf/test_ipfix_export.py')
-rw-r--r-- | test/asf/test_ipfix_export.py | 196 |
1 files changed, 0 insertions, 196 deletions
diff --git a/test/asf/test_ipfix_export.py b/test/asf/test_ipfix_export.py deleted file mode 100644 index be4239edbec..00000000000 --- a/test/asf/test_ipfix_export.py +++ /dev/null @@ -1,196 +0,0 @@ -#!/usr/bin/env python3 -from __future__ import print_function -import binascii -import random -import socket -import unittest -import time -import re - -from asfframework import VppTestCase -from vpp_object import VppObject -from vpp_pg_interface import CaptureTimeoutError -from vpp_ip_route import VppIpRoute, VppRoutePath -from ipaddress import ip_address, IPv4Address, IPv6Address -from socket import AF_INET, AF_INET6 - - -class TestIpfixExporter(VppTestCase): - """Ipfix Exporter Tests""" - - def setUp(self): - super(TestIpfixExporter, self).setUp() - self.create_pg_interfaces(range(4)) - for i in self.pg_interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - i.config_ip6() - i.resolve_ndp() - i.disable_ipv6_ra() - - def tearDown(self): - super(TestIpfixExporter, self).tearDown() - for i in self.pg_interfaces: - i.unconfig_ip4() - i.unconfig_ip6() - i.admin_down() - - def find_exp_by_collector_addr(self, exporters, addr): - """Find the exporter in the list of exportes with the given addr""" - - for exp in exporters: - if exp.collector_address == IPv4Address(addr): - return exp - return None - - def verify_exporter_detail( - self, exp, collector_addr, src_addr, collector_port=4739, mtu=1400, interval=20 - ): - self.assertTrue(exp is not None) - self.assert_equal(exp.collector_address, collector_addr) - self.assert_equal(exp.src_address, src_addr) - self.assert_equal(exp.collector_port, collector_port) - self.assert_equal(exp.path_mtu, mtu) - self.assert_equal(exp.template_interval, interval) - - def test_create_multipe_exporters(self): - """test that we can create and dump multiple exporters""" - - mtu = 1400 - interval = 20 - port = 4739 - - # Old API - always gives us pool index 0. - self.vapi.set_ipfix_exporter( - collector_address=self.pg1.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - ) - - exporters = self.vapi.ipfix_exporter_dump() - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # create a 2nd exporter - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg2.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - is_create=True, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 2) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # Create a 3rd exporter - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg3.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - is_create=True, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 3) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # Modify the 2nd exporter. - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg2.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu + 1, - template_interval=interval + 1, - is_create=True, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 3) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4) - self.verify_exporter_detail( - exp, - IPv4Address(self.pg2.remote_ip4), - IPv4Address(self.pg0.local_ip4), - mtu=mtu + 1, - interval=interval + 1, - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # Delete 2nd exporter - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg2.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - is_create=False, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 2) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # Delete final exporter (exporter in slot 0 can not be deleted) - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg3.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - is_create=False, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 1) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) |