summaryrefslogtreecommitdiffstats
path: root/test/test_flowprobe.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_flowprobe.py')
-rw-r--r--test/test_flowprobe.py1082
1 files changed, 0 insertions, 1082 deletions
diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py
deleted file mode 100644
index 9ffe84b8c2c..00000000000
--- a/test/test_flowprobe.py
+++ /dev/null
@@ -1,1082 +0,0 @@
-#!/usr/bin/env python
-from __future__ import print_function
-import binascii
-import random
-import socket
-import unittest
-import time
-import re
-
-from scapy.packet import Raw
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, TCP, UDP
-from scapy.layers.inet6 import IPv6
-
-from framework import VppTestCase, VppTestRunner, running_extended_tests
-from vpp_object import VppObject
-from vpp_pg_interface import CaptureTimeoutError
-from util import ppp
-from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
-from vpp_ip_route import VppIpRoute, VppRoutePath
-
-
-class VppCFLOW(VppObject):
- """CFLOW object for IPFIX exporter and Flowprobe feature"""
-
- def __init__(self, test, intf='pg2', active=0, passive=0, timeout=100,
- mtu=1024, datapath='l2', layer='l2 l3 l4'):
- self._test = test
- self._intf = intf
- self._active = active
- if passive == 0 or passive < active:
- self._passive = active+1
- else:
- self._passive = passive
- self._datapath = datapath # l2 ip4 ip6
- self._collect = layer # l2 l3 l4
- self._timeout = timeout
- self._mtu = mtu
- self._configured = False
-
- def add_vpp_config(self):
- self.enable_exporter()
- self._test.vapi.flowprobe_params(
- record_l2=1 if 'l2' in self._collect.lower() else 0,
- record_l3=1 if 'l3' in self._collect.lower() else 0,
- record_l4=1 if 'l4' in self._collect.lower() else 0,
- active_timer=self._active, passive_timer=self._passive)
- self.enable_flowprobe_feature()
- self._test.vapi.cli("ipfix flush")
- self._configured = True
-
- def remove_vpp_config(self):
- self.disable_exporter()
- self.disable_flowprobe_feature()
- self._test.vapi.cli("ipfix flush")
- self._configured = False
-
- def enable_exporter(self):
- self._test.vapi.set_ipfix_exporter(
- collector_address=self._test.pg0.remote_ip4n,
- src_address=self._test.pg0.local_ip4n,
- path_mtu=self._mtu,
- template_interval=self._timeout)
-
- def enable_flowprobe_feature(self):
- self._test.vapi.ppcli("flowprobe feature add-del %s %s" %
- (self._intf, self._datapath))
-
- def disable_exporter(self):
- self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
-
- def disable_flowprobe_feature(self):
- self._test.vapi.cli("flowprobe feature add-del %s %s disable" %
- (self._intf, self._datapath))
-
- def object_id(self):
- return "ipfix-collector-%s-%s" % (self._src, self.dst)
-
- def query_vpp_config(self):
- return self._configured
-
- def verify_templates(self, decoder=None, timeout=1, count=3):
- templates = []
- p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
- self._test.assertTrue(p.haslayer(IPFIX))
- if decoder is not None and p.haslayer(Template):
- templates.append(p[Template].templateID)
- decoder.add_template(p.getlayer(Template))
- if count > 1:
- p = self._test.wait_for_cflow_packet(self._test.collector, 2)
- self._test.assertTrue(p.haslayer(IPFIX))
- if decoder is not None and p.haslayer(Template):
- templates.append(p[Template].templateID)
- decoder.add_template(p.getlayer(Template))
- if count > 2:
- p = self._test.wait_for_cflow_packet(self._test.collector, 2)
- self._test.assertTrue(p.haslayer(IPFIX))
- if decoder is not None and p.haslayer(Template):
- templates.append(p[Template].templateID)
- decoder.add_template(p.getlayer(Template))
- return templates
-
-
-class MethodHolder(VppTestCase):
- """ Flow-per-packet plugin: test L2, IP4, IP6 reporting """
-
- # Test variables
- debug_print = False
- max_number_of_packets = 10
- pkts = []
-
- @classmethod
- def setUpClass(cls):
- """
- Perform standard class setup (defined by class method setUpClass in
- class VppTestCase) before running the test case, set test case related
- variables and configure VPP.
- """
- super(MethodHolder, cls).setUpClass()
- try:
- # Create pg interfaces
- cls.create_pg_interfaces(range(9))
-
- # Packet sizes
- cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
-
- # Create BD with MAC learning and unknown unicast flooding disabled
- # and put interfaces to this BD
- cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1)
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1)
-
- # Set up all interfaces
- for i in cls.pg_interfaces:
- i.admin_up()
-
- cls.pg0.config_ip4()
- cls.pg0.configure_ipv4_neighbors()
- cls.collector = cls.pg0
-
- cls.pg1.config_ip4()
- cls.pg1.resolve_arp()
- cls.pg2.config_ip4()
- cls.pg2.resolve_arp()
- cls.pg3.config_ip4()
- cls.pg3.resolve_arp()
- cls.pg4.config_ip4()
- cls.pg4.resolve_arp()
- cls.pg7.config_ip4()
- cls.pg8.config_ip4()
- cls.pg8.configure_ipv4_neighbors()
-
- cls.pg5.config_ip6()
- cls.pg5.resolve_ndp()
- cls.pg5.disable_ipv6_ra()
- cls.pg6.config_ip6()
- cls.pg6.resolve_ndp()
- cls.pg6.disable_ipv6_ra()
- except Exception:
- super(MethodHolder, cls).tearDownClass()
- raise
-
- @classmethod
- def tearDownClass(cls):
- super(MethodHolder, cls).tearDownClass()
-
- def create_stream(self, src_if=None, dst_if=None, packets=None,
- size=None, ip_ver='v4'):
- """Create a packet stream to tickle the plugin
-
- :param VppInterface src_if: Source interface for packet stream
- :param VppInterface src_if: Dst interface for packet stream
- """
- if src_if is None:
- src_if = self.pg1
- if dst_if is None:
- dst_if = self.pg2
- self.pkts = []
- if packets is None:
- packets = random.randint(1, self.max_number_of_packets)
- pkt_size = size
- for p in range(0, packets):
- if size is None:
- pkt_size = random.choice(self.pg_if_packet_sizes)
- info = self.create_packet_info(src_if, dst_if)
- payload = self.info_to_payload(info)
- p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
- if ip_ver == 'v4':
- p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
- else:
- p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
- p /= UDP(sport=1234, dport=4321)
- p /= Raw(payload)
- info.data = p.copy()
- self.extend_packet(p, pkt_size)
- self.pkts.append(p)
-
- def verify_cflow_data(self, decoder, capture, cflow):
- octets = 0
- packets = 0
- for p in capture:
- octets += p[IP].len
- packets += 1
- if cflow.haslayer(Data):
- data = decoder.decode_data_set(cflow.getlayer(Set))
- for record in data:
- self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
- self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
-
- def send_packets(self, src_if=None, dst_if=None):
- if src_if is None:
- src_if = self.pg1
- if dst_if is None:
- dst_if = self.pg2
- self.pg_enable_capture([dst_if])
- src_if.add_stream(self.pkts)
- self.pg_start()
- return dst_if.get_capture(len(self.pkts))
-
- def verify_cflow_data_detail(self, decoder, capture, cflow,
- data_set={1: 'octets', 2: 'packets'},
- ip_ver='v4'):
- if self.debug_print:
- print(capture[0].show())
- if cflow.haslayer(Data):
- data = decoder.decode_data_set(cflow.getlayer(Set))
- if self.debug_print:
- print(data)
- if ip_ver == 'v4':
- ip_layer = capture[0][IP]
- else:
- ip_layer = capture[0][IPv6]
- if data_set is not None:
- for record in data:
- # skip flow if ingress/egress interface is 0
- if int(binascii.hexlify(record[10]), 16) == 0:
- continue
- if int(binascii.hexlify(record[14]), 16) == 0:
- continue
-
- for field in data_set:
- if field not in record.keys():
- continue
- value = data_set[field]
- if value == 'octets':
- value = ip_layer.len
- if ip_ver == 'v6':
- value += 40 # ??? is this correct
- elif value == 'packets':
- value = 1
- elif value == 'src_ip':
- if ip_ver == 'v4':
- ip = socket.inet_pton(socket.AF_INET,
- ip_layer.src)
- else:
- ip = socket.inet_pton(socket.AF_INET6,
- ip_layer.src)
- value = int(binascii.hexlify(ip), 16)
- elif value == 'dst_ip':
- if ip_ver == 'v4':
- ip = socket.inet_pton(socket.AF_INET,
- ip_layer.dst)
- else:
- ip = socket.inet_pton(socket.AF_INET6,
- ip_layer.dst)
- value = int(binascii.hexlify(ip), 16)
- elif value == 'sport':
- value = int(capture[0][UDP].sport)
- elif value == 'dport':
- value = int(capture[0][UDP].dport)
- self.assertEqual(int(binascii.hexlify(
- record[field]), 16),
- value)
-
- def verify_cflow_data_notimer(self, decoder, capture, cflows):
- idx = 0
- for cflow in cflows:
- if cflow.haslayer(Data):
- data = decoder.decode_data_set(cflow.getlayer(Set))
- else:
- raise Exception("No CFLOW data")
-
- for rec in data:
- p = capture[idx]
- idx += 1
- self.assertEqual(p[IP].len, int(
- binascii.hexlify(rec[1]), 16))
- self.assertEqual(1, int(
- binascii.hexlify(rec[2]), 16))
- self.assertEqual(len(capture), idx)
-
- def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1,
- expected=True):
- """ wait for CFLOW packet and verify its correctness
-
- :param timeout: how long to wait
-
- :returns: tuple (packet, time spent waiting for packet)
- """
- self.logger.info("IPFIX: Waiting for CFLOW packet")
- deadline = time.time() + timeout
- counter = 0
- # self.logger.debug(self.vapi.ppcli("show flow table"))
- while True:
- counter += 1
- # sanity check
- self.assert_in_range(counter, 0, 100, "number of packets ignored")
- time_left = deadline - time.time()
- try:
- if time_left < 0 and expected:
- # self.logger.debug(self.vapi.ppcli("show flow table"))
- raise CaptureTimeoutError(
- "Packet did not arrive within timeout")
- p = collector_intf.wait_for_packet(timeout=time_left)
- except CaptureTimeoutError:
- if expected:
- # self.logger.debug(self.vapi.ppcli("show flow table"))
- raise CaptureTimeoutError(
- "Packet did not arrive within timeout")
- else:
- return
- if not expected:
- raise CaptureTimeoutError("Packet arrived even not expected")
- self.assertEqual(p[Set].setID, set_id)
- # self.logger.debug(self.vapi.ppcli("show flow table"))
- self.logger.debug(ppp("IPFIX: Got packet:", p))
- break
- return p
-
-
-class Flowprobe(MethodHolder):
- """Template verification, timer tests"""
-
- @classmethod
- def setUpClass(cls):
- super(Flowprobe, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(Flowprobe, cls).tearDownClass()
-
- def test_0001(self):
- """ timer less than template timeout"""
- self.logger.info("FFP_TEST_START_0001")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, active=2)
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder)
-
- self.create_stream(packets=1)
- self.send_packets()
- capture = self.pg2.get_capture(1)
-
- # make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
- self.verify_cflow_data(ipfix_decoder, capture, cflow)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
- def test_0002(self):
- """ timer greater than template timeout"""
- self.logger.info("FFP_TEST_START_0002")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, timeout=3, active=4)
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- ipfix.verify_templates()
-
- self.create_stream(packets=2)
- self.send_packets()
- capture = self.pg2.get_capture(2)
-
- # next set of template packet should arrive after 20 seconds
- # template packet should arrive within 20 s
- templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
-
- # make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
- self.verify_cflow_data(ipfix_decoder, capture, cflow)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0002")
-
- def test_cflow_packet(self):
- """verify cflow packet fields"""
- self.logger.info("FFP_TEST_START_0000")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, intf='pg8', datapath="ip4",
- layer='l2 l3 l4', active=2)
- ipfix.add_vpp_config()
-
- route_9001 = VppIpRoute(self, "9.0.0.0", 24,
- [VppRoutePath(self.pg8._remote_hosts[0].ip4,
- self.pg8.sw_if_index)])
- route_9001.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- templates = ipfix.verify_templates(ipfix_decoder, count=1)
-
- self.pkts = [(Ether(dst=self.pg7.local_mac,
- src=self.pg7.remote_mac) /
- IP(src=self.pg7.remote_ip4, dst="9.0.0.100") /
- TCP(sport=1234, dport=4321, flags=80) /
- Raw('\xa5' * 100))]
-
- nowUTC = int(time.time())
- nowUNIX = nowUTC+2208988800
- self.send_packets(src_if=self.pg7, dst_if=self.pg8)
-
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
- self.collector.get_capture(2)
-
- if cflow[0].haslayer(IPFIX):
- self.assertEqual(cflow[IPFIX].version, 10)
- self.assertEqual(cflow[IPFIX].observationDomainID, 1)
- self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
- self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
- if cflow.haslayer(Data):
- record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
- # ingress interface
- self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
- # egress interface
- self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
- # packets
- self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
- # src mac
- self.assertEqual(':'.join(re.findall('..', record[56].encode(
- 'hex'))), self.pg8.local_mac)
- # dst mac
- self.assertEqual(':'.join(re.findall('..', record[80].encode(
- 'hex'))), self.pg8.remote_mac)
- flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
- # flow start timestamp
- self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
- flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
- # flow end timestamp
- self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
- # ethernet type
- self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
- # src ip
- self.assertEqual('.'.join(re.findall('..', record[8].encode(
- 'hex'))),
- '.'.join('{:02x}'.format(int(n)) for n in
- self.pg7.remote_ip4.split('.')))
- # dst ip
- self.assertEqual('.'.join(re.findall('..', record[12].encode(
- 'hex'))),
- '.'.join('{:02x}'.format(int(n)) for n in
- "9.0.0.100".split('.')))
- # protocol (TCP)
- self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
- # src port
- self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
- # dst port
- self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
- # tcp flags
- self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0000")
-
-
-class Datapath(MethodHolder):
- """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
-
- @classmethod
- def setUpClass(cls):
- super(Datapath, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(Datapath, cls).tearDownClass()
-
- def test_templatesL2(self):
- """ verify template on L2 datapath"""
- self.logger.info("FFP_TEST_START_0000")
- self.pg_enable_capture(self.pg_interfaces)
-
- ipfix = VppCFLOW(test=self, layer='l2')
- ipfix.add_vpp_config()
-
- # template packet should arrive immediately
- self.vapi.ipfix_flush()
- ipfix.verify_templates(timeout=3, count=1)
- self.collector.get_capture(1)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0000")
-
- def test_L2onL2(self):
- """ L2 data on L2 datapath"""
- self.logger.info("FFP_TEST_START_0001")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, layer='l2')
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, count=1)
-
- self.create_stream(packets=1)
- capture = self.send_packets()
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 256: 8})
- self.collector.get_capture(2)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
- def test_L3onL2(self):
- """ L3 data on L2 datapath"""
- self.logger.info("FFP_TEST_START_0002")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, layer='l3')
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, count=2)
-
- self.create_stream(packets=1)
- capture = self.send_packets()
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 4: 17,
- 8: 'src_ip', 12: 'dst_ip'})
-
- self.collector.get_capture(3)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0002")
-
- def test_L4onL2(self):
- """ L4 data on L2 datapath"""
- self.logger.info("FFP_TEST_START_0003")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, layer='l4')
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, count=2)
-
- self.create_stream(packets=1)
- capture = self.send_packets()
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 7: 'sport', 11: 'dport'})
-
- self.collector.get_capture(3)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0003")
-
- def test_templatesIp4(self):
- """ verify templates on IP4 datapath"""
- self.logger.info("FFP_TEST_START_0000")
-
- self.pg_enable_capture(self.pg_interfaces)
-
- ipfix = VppCFLOW(test=self, datapath='ip4')
- ipfix.add_vpp_config()
-
- # template packet should arrive immediately
- self.vapi.ipfix_flush()
- ipfix.verify_templates(timeout=3, count=1)
- self.collector.get_capture(1)
-
- ipfix.remove_vpp_config()
-
- self.logger.info("FFP_TEST_FINISH_0000")
-
- def test_L2onIP4(self):
- """ L2 data on IP4 datapath"""
- self.logger.info("FFP_TEST_START_0001")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, intf='pg4', layer='l2', datapath='ip4')
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, count=1)
-
- self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
- capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 256: 8})
-
- # expected two templates and one cflow packet
- self.collector.get_capture(2)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
- def test_L3onIP4(self):
- """ L3 data on IP4 datapath"""
- self.logger.info("FFP_TEST_START_0002")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, intf='pg4', layer='l3', datapath='ip4')
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, count=1)
-
- self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
- capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {1: 'octets', 2: 'packets',
- 8: 'src_ip', 12: 'dst_ip'})
-
- # expected two templates and one cflow packet
- self.collector.get_capture(2)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0002")
-
- def test_L4onIP4(self):
- """ L4 data on IP4 datapath"""
- self.logger.info("FFP_TEST_START_0003")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, intf='pg4', layer='l4', datapath='ip4')
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, count=1)
-
- self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
- capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 7: 'sport', 11: 'dport'})
-
- # expected two templates and one cflow packet
- self.collector.get_capture(2)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0003")
-
- def test_templatesIP6(self):
- """ verify templates on IP6 datapath"""
- self.logger.info("FFP_TEST_START_0000")
- self.pg_enable_capture(self.pg_interfaces)
-
- ipfix = VppCFLOW(test=self, datapath='ip6')
- ipfix.add_vpp_config()
-
- # template packet should arrive immediately
- ipfix.verify_templates(count=1)
- self.collector.get_capture(1)
-
- ipfix.remove_vpp_config()
-
- self.logger.info("FFP_TEST_FINISH_0000")
-
- def test_L2onIP6(self):
- """ L2 data on IP6 datapath"""
- self.logger.info("FFP_TEST_START_0001")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, intf='pg6', layer='l2', datapath='ip6')
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, count=1)
-
- self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
- ip_ver='IPv6')
- capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 256: 56710},
- ip_ver='v6')
-
- # expected two templates and one cflow packet
- self.collector.get_capture(2)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
- def test_L3onIP6(self):
- """ L3 data on IP6 datapath"""
- self.logger.info("FFP_TEST_START_0002")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, intf='pg6', layer='l3', datapath='ip6')
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, count=1)
-
- self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
- ip_ver='IPv6')
- capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets',
- 27: 'src_ip', 28: 'dst_ip'},
- ip_ver='v6')
-
- # expected two templates and one cflow packet
- self.collector.get_capture(2)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0002")
-
- def test_L4onIP6(self):
- """ L4 data on IP6 datapath"""
- self.logger.info("FFP_TEST_START_0003")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, intf='pg6', layer='l4', datapath='ip6')
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, count=1)
-
- self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
- ip_ver='IPv6')
- capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 7: 'sport', 11: 'dport'},
- ip_ver='v6')
-
- # expected two templates and one cflow packet
- self.collector.get_capture(2)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0003")
-
- def test_0001(self):
- """ no timers, one CFLOW packet, 9 Flows inside"""
- self.logger.info("FFP_TEST_START_0001")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self)
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder)
-
- self.create_stream(packets=9)
- capture = self.send_packets()
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- cflow = self.wait_for_cflow_packet(self.collector, templates[1])
- self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
- self.collector.get_capture(4)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
- def test_0002(self):
- """ no timers, two CFLOW packets (mtu=256), 3 Flows in each"""
- self.logger.info("FFP_TEST_START_0002")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self, mtu=256)
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- self.vapi.ipfix_flush()
- templates = ipfix.verify_templates(ipfix_decoder)
-
- self.create_stream(packets=6)
- capture = self.send_packets()
-
- # make sure the one packet we expect actually showed up
- cflows = []
- self.vapi.ipfix_flush()
- cflows.append(self.wait_for_cflow_packet(self.collector,
- templates[1]))
- cflows.append(self.wait_for_cflow_packet(self.collector,
- templates[1]))
- self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
- self.collector.get_capture(5)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0002")
-
-
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
-class DisableIPFIX(MethodHolder):
- """Disable IPFIX"""
-
- @classmethod
- def setUpClass(cls):
- super(DisableIPFIX, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(DisableIPFIX, cls).tearDownClass()
-
- def test_0001(self):
- """ disable IPFIX after first packets"""
- self.logger.info("FFP_TEST_START_0001")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self)
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder)
-
- self.create_stream()
- self.send_packets()
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- self.wait_for_cflow_packet(self.collector, templates[1])
- self.collector.get_capture(4)
-
- # disable IPFIX
- ipfix.disable_exporter()
- self.pg_enable_capture([self.collector])
-
- self.send_packets()
-
- # make sure no one packet arrived in 1 minute
- self.vapi.ipfix_flush()
- self.wait_for_cflow_packet(self.collector, templates[1],
- expected=False)
- self.collector.get_capture(0)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
-
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
-class ReenableIPFIX(MethodHolder):
- """Re-enable IPFIX"""
-
- @classmethod
- def setUpClass(cls):
- super(ReenableIPFIX, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(ReenableIPFIX, cls).tearDownClass()
-
- def test_0011(self):
- """ disable IPFIX after first packets and re-enable after few packets
- """
- self.logger.info("FFP_TEST_START_0001")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self)
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder)
-
- self.create_stream(packets=5)
- self.send_packets()
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- self.wait_for_cflow_packet(self.collector, templates[1])
- self.collector.get_capture(4)
-
- # disable IPFIX
- ipfix.disable_exporter()
- self.vapi.ipfix_flush()
- self.pg_enable_capture([self.collector])
-
- self.send_packets()
-
- # make sure no one packet arrived in active timer span
- self.vapi.ipfix_flush()
- self.wait_for_cflow_packet(self.collector, templates[1],
- expected=False)
- self.collector.get_capture(0)
- self.pg2.get_capture(5)
-
- # enable IPFIX
- ipfix.enable_exporter()
-
- capture = self.collector.get_capture(4)
- nr_templates = 0
- nr_data = 0
- for p in capture:
- self.assertTrue(p.haslayer(IPFIX))
- if p.haslayer(Template):
- nr_templates += 1
- self.assertTrue(nr_templates, 3)
- for p in capture:
- self.assertTrue(p.haslayer(IPFIX))
- if p.haslayer(Data):
- nr_data += 1
- self.assertTrue(nr_templates, 1)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
-
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
-class DisableFP(MethodHolder):
- """Disable Flowprobe feature"""
-
- @classmethod
- def setUpClass(cls):
- super(DisableFP, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(DisableFP, cls).tearDownClass()
-
- def test_0001(self):
- """ disable flowprobe feature after first packets"""
- self.logger.info("FFP_TEST_START_0001")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
- ipfix = VppCFLOW(test=self)
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder)
-
- self.create_stream()
- self.send_packets()
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- self.wait_for_cflow_packet(self.collector, templates[1])
- self.collector.get_capture(4)
-
- # disable IPFIX
- ipfix.disable_flowprobe_feature()
- self.pg_enable_capture([self.collector])
-
- self.send_packets()
-
- # make sure no one packet arrived in active timer span
- self.vapi.ipfix_flush()
- self.wait_for_cflow_packet(self.collector, templates[1],
- expected=False)
- self.collector.get_capture(0)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
-
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
-class ReenableFP(MethodHolder):
- """Re-enable Flowprobe feature"""
-
- @classmethod
- def setUpClass(cls):
- super(ReenableFP, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(ReenableFP, cls).tearDownClass()
-
- def test_0001(self):
- """ disable flowprobe feature after first packets and re-enable
- after few packets """
- self.logger.info("FFP_TEST_START_0001")
- self.pg_enable_capture(self.pg_interfaces)
- self.pkts = []
-
- ipfix = VppCFLOW(test=self)
- ipfix.add_vpp_config()
-
- ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- self.vapi.ipfix_flush()
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
-
- self.create_stream()
- self.send_packets()
-
- # make sure the one packet we expect actually showed up
- self.vapi.ipfix_flush()
- self.wait_for_cflow_packet(self.collector, templates[1], 5)
- self.collector.get_capture(4)
-
- # disable FPP feature
- ipfix.disable_flowprobe_feature()
- self.pg_enable_capture([self.collector])
-
- self.send_packets()
-
- # make sure no one packet arrived in active timer span
- self.vapi.ipfix_flush()
- self.wait_for_cflow_packet(self.collector, templates[1], 5,
- expected=False)
- self.collector.get_capture(0)
-
- # enable FPP feature
- ipfix.enable_flowprobe_feature()
- self.vapi.ipfix_flush()
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
-
- self.send_packets()
-
- # make sure the next packets (templates and data) we expect actually
- # showed up
- self.vapi.ipfix_flush()
- self.wait_for_cflow_packet(self.collector, templates[1], 5)
- self.collector.get_capture(4)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
-
-if __name__ == '__main__':
- unittest.main(testRunner=VppTestRunner)