summaryrefslogtreecommitdiffstats
path: root/test/test_flowprobe.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_flowprobe.py')
-rw-r--r--test/test_flowprobe.py1094
1 files changed, 1094 insertions, 0 deletions
diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py
new file mode 100644
index 00000000000..517729d8591
--- /dev/null
+++ b/test/test_flowprobe.py
@@ -0,0 +1,1094 @@
+#!/usr/bin/env python3
+from __future__ import print_function
+import binascii
+import random
+import socket
+import unittest
+import time
+import re
+
+from scapy.packet import Raw
+from scapy.layers.l2 import Ether
+from scapy.layers.inet import IP, TCP, UDP
+from scapy.layers.inet6 import IPv6
+
+from framework import tag_fixme_vpp_workers
+from framework import VppTestCase, VppTestRunner, running_extended_tests
+from framework import tag_run_solo
+from vpp_object import VppObject
+from vpp_pg_interface import CaptureTimeoutError
+from util import ppp
+from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_papi.macaddress import mac_ntop
+from socket import inet_ntop
+from vpp_papi import VppEnum
+
+
+class VppCFLOW(VppObject):
+ """CFLOW object for IPFIX exporter and Flowprobe feature"""
+
+ def __init__(self, test, intf='pg2', active=0, passive=0, timeout=100,
+ mtu=1024, datapath='l2', layer='l2 l3 l4'):
+ self._test = test
+ self._intf = intf
+ self._active = active
+ if passive == 0 or passive < active:
+ self._passive = active+1
+ else:
+ self._passive = passive
+ self._datapath = datapath # l2 ip4 ip6
+ self._collect = layer # l2 l3 l4
+ self._timeout = timeout
+ self._mtu = mtu
+ self._configured = False
+
+ def add_vpp_config(self):
+ self.enable_exporter()
+ l2_flag = 0
+ l3_flag = 0
+ l4_flag = 0
+ if 'l2' in self._collect.lower():
+ l2_flag = (VppEnum.vl_api_flowprobe_record_flags_t.
+ FLOWPROBE_RECORD_FLAG_L2)
+ if 'l3' in self._collect.lower():
+ l3_flag = (VppEnum.vl_api_flowprobe_record_flags_t.
+ FLOWPROBE_RECORD_FLAG_L3)
+ if 'l4' in self._collect.lower():
+ l4_flag = (VppEnum.vl_api_flowprobe_record_flags_t.
+ FLOWPROBE_RECORD_FLAG_L4)
+ self._test.vapi.flowprobe_params(
+ record_flags=(l2_flag | l3_flag | l4_flag),
+ active_timer=self._active, passive_timer=self._passive)
+ self.enable_flowprobe_feature()
+ self._test.vapi.cli("ipfix flush")
+ self._configured = True
+
+ def remove_vpp_config(self):
+ self.disable_exporter()
+ self.disable_flowprobe_feature()
+ self._test.vapi.cli("ipfix flush")
+ self._configured = False
+
+ def enable_exporter(self):
+ self._test.vapi.set_ipfix_exporter(
+ collector_address=self._test.pg0.remote_ip4,
+ src_address=self._test.pg0.local_ip4,
+ path_mtu=self._mtu,
+ template_interval=self._timeout)
+
+ def enable_flowprobe_feature(self):
+ self._test.vapi.ppcli("flowprobe feature add-del %s %s" %
+ (self._intf, self._datapath))
+
+ def disable_exporter(self):
+ self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
+
+ def disable_flowprobe_feature(self):
+ self._test.vapi.cli("flowprobe feature add-del %s %s disable" %
+ (self._intf, self._datapath))
+
+ def object_id(self):
+ return "ipfix-collector-%s-%s" % (self._src, self.dst)
+
+ def query_vpp_config(self):
+ return self._configured
+
+ def verify_templates(self, decoder=None, timeout=1, count=3):
+ templates = []
+ p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
+ self._test.assertTrue(p.haslayer(IPFIX))
+ if decoder is not None and p.haslayer(Template):
+ templates.append(p[Template].templateID)
+ decoder.add_template(p.getlayer(Template))
+ if count > 1:
+ p = self._test.wait_for_cflow_packet(self._test.collector, 2)
+ self._test.assertTrue(p.haslayer(IPFIX))
+ if decoder is not None and p.haslayer(Template):
+ templates.append(p[Template].templateID)
+ decoder.add_template(p.getlayer(Template))
+ if count > 2:
+ p = self._test.wait_for_cflow_packet(self._test.collector, 2)
+ self._test.assertTrue(p.haslayer(IPFIX))
+ if decoder is not None and p.haslayer(Template):
+ templates.append(p[Template].templateID)
+ decoder.add_template(p.getlayer(Template))
+ return templates
+
+
+class MethodHolder(VppTestCase):
+ """ Flow-per-packet plugin: test L2, IP4, IP6 reporting """
+
+ # Test variables
+ debug_print = False
+ max_number_of_packets = 10
+ pkts = []
+
+ @classmethod
+ def setUpClass(cls):
+ """
+ Perform standard class setup (defined by class method setUpClass in
+ class VppTestCase) before running the test case, set test case related
+ variables and configure VPP.
+ """
+ super(MethodHolder, cls).setUpClass()
+ try:
+ # Create pg interfaces
+ cls.create_pg_interfaces(range(9))
+
+ # Packet sizes
+ cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
+
+ # Create BD with MAC learning and unknown unicast flooding disabled
+ # and put interfaces to this BD
+ cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
+ cls.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1)
+ cls.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1)
+
+ # Set up all interfaces
+ for i in cls.pg_interfaces:
+ i.admin_up()
+
+ cls.pg0.config_ip4()
+ cls.pg0.configure_ipv4_neighbors()
+ cls.collector = cls.pg0
+
+ cls.pg1.config_ip4()
+ cls.pg1.resolve_arp()
+ cls.pg2.config_ip4()
+ cls.pg2.resolve_arp()
+ cls.pg3.config_ip4()
+ cls.pg3.resolve_arp()
+ cls.pg4.config_ip4()
+ cls.pg4.resolve_arp()
+ cls.pg7.config_ip4()
+ cls.pg8.config_ip4()
+ cls.pg8.configure_ipv4_neighbors()
+
+ cls.pg5.config_ip6()
+ cls.pg5.resolve_ndp()
+ cls.pg5.disable_ipv6_ra()
+ cls.pg6.config_ip6()
+ cls.pg6.resolve_ndp()
+ cls.pg6.disable_ipv6_ra()
+ except Exception:
+ super(MethodHolder, cls).tearDownClass()
+ raise
+
+ @classmethod
+ def tearDownClass(cls):
+ super(MethodHolder, cls).tearDownClass()
+
+ def create_stream(self, src_if=None, dst_if=None, packets=None,
+ size=None, ip_ver='v4'):
+ """Create a packet stream to tickle the plugin
+
+ :param VppInterface src_if: Source interface for packet stream
+ :param VppInterface src_if: Dst interface for packet stream
+ """
+ if src_if is None:
+ src_if = self.pg1
+ if dst_if is None:
+ dst_if = self.pg2
+ self.pkts = []
+ if packets is None:
+ packets = random.randint(1, self.max_number_of_packets)
+ pkt_size = size
+ for p in range(0, packets):
+ if size is None:
+ pkt_size = random.choice(self.pg_if_packet_sizes)
+ info = self.create_packet_info(src_if, dst_if)
+ payload = self.info_to_payload(info)
+ p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
+ if ip_ver == 'v4':
+ p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ else:
+ p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+ p /= UDP(sport=1234, dport=4321)
+ p /= Raw(payload)
+ info.data = p.copy()
+ self.extend_packet(p, pkt_size)
+ self.pkts.append(p)
+
+ def verify_cflow_data(self, decoder, capture, cflow):
+ octets = 0
+ packets = 0
+ for p in capture:
+ octets += p[IP].len
+ packets += 1
+ if cflow.haslayer(Data):
+ data = decoder.decode_data_set(cflow.getlayer(Set))
+ for record in data:
+ self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
+
+ def send_packets(self, src_if=None, dst_if=None):
+ if src_if is None:
+ src_if = self.pg1
+ if dst_if is None:
+ dst_if = self.pg2
+ self.pg_enable_capture([dst_if])
+ src_if.add_stream(self.pkts)
+ self.pg_start()
+ return dst_if.get_capture(len(self.pkts))
+
+ def verify_cflow_data_detail(self, decoder, capture, cflow,
+ data_set={1: 'octets', 2: 'packets'},
+ ip_ver='v4'):
+ if self.debug_print:
+ print(capture[0].show())
+ if cflow.haslayer(Data):
+ data = decoder.decode_data_set(cflow.getlayer(Set))
+ if self.debug_print:
+ print(data)
+ if ip_ver == 'v4':
+ ip_layer = capture[0][IP]
+ else:
+ ip_layer = capture[0][IPv6]
+ if data_set is not None:
+ for record in data:
+ # skip flow if ingress/egress interface is 0
+ if int(binascii.hexlify(record[10]), 16) == 0:
+ continue
+ if int(binascii.hexlify(record[14]), 16) == 0:
+ continue
+
+ for field in data_set:
+ if field not in record.keys():
+ continue
+ value = data_set[field]
+ if value == 'octets':
+ value = ip_layer.len
+ if ip_ver == 'v6':
+ value += 40 # ??? is this correct
+ elif value == 'packets':
+ value = 1
+ elif value == 'src_ip':
+ if ip_ver == 'v4':
+ ip = socket.inet_pton(socket.AF_INET,
+ ip_layer.src)
+ else:
+ ip = socket.inet_pton(socket.AF_INET6,
+ ip_layer.src)
+ value = int(binascii.hexlify(ip), 16)
+ elif value == 'dst_ip':
+ if ip_ver == 'v4':
+ ip = socket.inet_pton(socket.AF_INET,
+ ip_layer.dst)
+ else:
+ ip = socket.inet_pton(socket.AF_INET6,
+ ip_layer.dst)
+ value = int(binascii.hexlify(ip), 16)
+ elif value == 'sport':
+ value = int(capture[0][UDP].sport)
+ elif value == 'dport':
+ value = int(capture[0][UDP].dport)
+ self.assertEqual(int(binascii.hexlify(
+ record[field]), 16),
+ value)
+
+ def verify_cflow_data_notimer(self, decoder, capture, cflows):
+ idx = 0
+ for cflow in cflows:
+ if cflow.haslayer(Data):
+ data = decoder.decode_data_set(cflow.getlayer(Set))
+ else:
+ raise Exception("No CFLOW data")
+
+ for rec in data:
+ p = capture[idx]
+ idx += 1
+ self.assertEqual(p[IP].len, int(
+ binascii.hexlify(rec[1]), 16))
+ self.assertEqual(1, int(
+ binascii.hexlify(rec[2]), 16))
+ self.assertEqual(len(capture), idx)
+
+ def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1,
+ expected=True):
+ """ wait for CFLOW packet and verify its correctness
+
+ :param timeout: how long to wait
+
+ :returns: tuple (packet, time spent waiting for packet)
+ """
+ self.logger.info("IPFIX: Waiting for CFLOW packet")
+ deadline = time.time() + timeout
+ counter = 0
+ # self.logger.debug(self.vapi.ppcli("show flow table"))
+ while True:
+ counter += 1
+ # sanity check
+ self.assert_in_range(counter, 0, 100, "number of packets ignored")
+ time_left = deadline - time.time()
+ try:
+ if time_left < 0 and expected:
+ # self.logger.debug(self.vapi.ppcli("show flow table"))
+ raise CaptureTimeoutError(
+ "Packet did not arrive within timeout")
+ p = collector_intf.wait_for_packet(timeout=time_left)
+ except CaptureTimeoutError:
+ if expected:
+ # self.logger.debug(self.vapi.ppcli("show flow table"))
+ raise CaptureTimeoutError(
+ "Packet did not arrive within timeout")
+ else:
+ return
+ if not expected:
+ raise CaptureTimeoutError("Packet arrived even not expected")
+ self.assertEqual(p[Set].setID, set_id)
+ # self.logger.debug(self.vapi.ppcli("show flow table"))
+ self.logger.debug(ppp("IPFIX: Got packet:", p))
+ break
+ return p
+
+
+@tag_run_solo
+@tag_fixme_vpp_workers
+class Flowprobe(MethodHolder):
+ """Template verification, timer tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(Flowprobe, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(Flowprobe, cls).tearDownClass()
+
+ def test_0001(self):
+ """ timer less than template timeout"""
+ self.logger.info("FFP_TEST_START_0001")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, active=2)
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder)
+
+ self.create_stream(packets=1)
+ self.send_packets()
+ capture = self.pg2.get_capture(1)
+
+ # make sure the one packet we expect actually showed up
+ cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
+ self.verify_cflow_data(ipfix_decoder, capture, cflow)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0001")
+
+ def test_0002(self):
+ """ timer greater than template timeout"""
+ self.logger.info("FFP_TEST_START_0002")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, timeout=3, active=4)
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ ipfix.verify_templates()
+
+ self.create_stream(packets=2)
+ self.send_packets()
+ capture = self.pg2.get_capture(2)
+
+ # next set of template packet should arrive after 20 seconds
+ # template packet should arrive within 20 s
+ templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
+
+ # make sure the one packet we expect actually showed up
+ cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
+ self.verify_cflow_data(ipfix_decoder, capture, cflow)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0002")
+
+ def test_cflow_packet(self):
+ """verify cflow packet fields"""
+ self.logger.info("FFP_TEST_START_0000")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, intf='pg8', datapath="ip4",
+ layer='l2 l3 l4', active=2)
+ ipfix.add_vpp_config()
+
+ route_9001 = VppIpRoute(self, "9.0.0.0", 24,
+ [VppRoutePath(self.pg8._remote_hosts[0].ip4,
+ self.pg8.sw_if_index)])
+ route_9001.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+ self.pkts = [(Ether(dst=self.pg7.local_mac,
+ src=self.pg7.remote_mac) /
+ IP(src=self.pg7.remote_ip4, dst="9.0.0.100") /
+ TCP(sport=1234, dport=4321, flags=80) /
+ Raw(b'\xa5' * 100))]
+
+ nowUTC = int(time.time())
+ nowUNIX = nowUTC+2208988800
+ self.send_packets(src_if=self.pg7, dst_if=self.pg8)
+
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
+ self.collector.get_capture(2)
+
+ if cflow[0].haslayer(IPFIX):
+ self.assertEqual(cflow[IPFIX].version, 10)
+ self.assertEqual(cflow[IPFIX].observationDomainID, 1)
+ self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
+ self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
+ if cflow.haslayer(Data):
+ record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
+ # ingress interface
+ self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
+ # egress interface
+ self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
+ # packets
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
+ # src mac
+ self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
+ # dst mac
+ self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
+ flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
+ # flow start timestamp
+ self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
+ flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
+ # flow end timestamp
+ self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
+ # ethernet type
+ self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
+ # src ip
+ self.assertEqual(inet_ntop(socket.AF_INET, record[8]),
+ self.pg7.remote_ip4)
+ # dst ip
+ self.assertEqual(inet_ntop(socket.AF_INET, record[12]),
+ "9.0.0.100")
+ # protocol (TCP)
+ self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
+ # src port
+ self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
+ # dst port
+ self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
+ # tcp flags
+ self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0000")
+
+
+@tag_fixme_vpp_workers
+class Datapath(MethodHolder):
+ """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(Datapath, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(Datapath, cls).tearDownClass()
+
+ def test_templatesL2(self):
+ """ verify template on L2 datapath"""
+ self.logger.info("FFP_TEST_START_0000")
+ self.pg_enable_capture(self.pg_interfaces)
+
+ ipfix = VppCFLOW(test=self, layer='l2')
+ ipfix.add_vpp_config()
+
+ # template packet should arrive immediately
+ self.vapi.ipfix_flush()
+ ipfix.verify_templates(timeout=3, count=1)
+ self.collector.get_capture(1)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0000")
+
+ def test_L2onL2(self):
+ """ L2 data on L2 datapath"""
+ self.logger.info("FFP_TEST_START_0001")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, layer='l2')
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+ self.create_stream(packets=1)
+ capture = self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
+ {2: 'packets', 256: 8})
+ self.collector.get_capture(2)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0001")
+
+ def test_L3onL2(self):
+ """ L3 data on L2 datapath"""
+ self.logger.info("FFP_TEST_START_0002")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, layer='l3')
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=2)
+
+ self.create_stream(packets=1)
+ capture = self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
+ {2: 'packets', 4: 17,
+ 8: 'src_ip', 12: 'dst_ip'})
+
+ self.collector.get_capture(3)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0002")
+
+ def test_L4onL2(self):
+ """ L4 data on L2 datapath"""
+ self.logger.info("FFP_TEST_START_0003")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, layer='l4')
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=2)
+
+ self.create_stream(packets=1)
+ capture = self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
+ {2: 'packets', 7: 'sport', 11: 'dport'})
+
+ self.collector.get_capture(3)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0003")
+
+ def test_templatesIp4(self):
+ """ verify templates on IP4 datapath"""
+ self.logger.info("FFP_TEST_START_0000")
+
+ self.pg_enable_capture(self.pg_interfaces)
+
+ ipfix = VppCFLOW(test=self, datapath='ip4')
+ ipfix.add_vpp_config()
+
+ # template packet should arrive immediately
+ self.vapi.ipfix_flush()
+ ipfix.verify_templates(timeout=3, count=1)
+ self.collector.get_capture(1)
+
+ ipfix.remove_vpp_config()
+
+ self.logger.info("FFP_TEST_FINISH_0000")
+
+ def test_L2onIP4(self):
+ """ L2 data on IP4 datapath"""
+ self.logger.info("FFP_TEST_START_0001")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, intf='pg4', layer='l2', datapath='ip4')
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+ self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
+ capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
+ {2: 'packets', 256: 8})
+
+ # expected two templates and one cflow packet
+ self.collector.get_capture(2)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0001")
+
+ def test_L3onIP4(self):
+ """ L3 data on IP4 datapath"""
+ self.logger.info("FFP_TEST_START_0002")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, intf='pg4', layer='l3', datapath='ip4')
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+ self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
+ capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
+ {1: 'octets', 2: 'packets',
+ 8: 'src_ip', 12: 'dst_ip'})
+
+ # expected two templates and one cflow packet
+ self.collector.get_capture(2)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0002")
+
+ def test_L4onIP4(self):
+ """ L4 data on IP4 datapath"""
+ self.logger.info("FFP_TEST_START_0003")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, intf='pg4', layer='l4', datapath='ip4')
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+ self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
+ capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
+ {2: 'packets', 7: 'sport', 11: 'dport'})
+
+ # expected two templates and one cflow packet
+ self.collector.get_capture(2)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0003")
+
+ def test_templatesIP6(self):
+ """ verify templates on IP6 datapath"""
+ self.logger.info("FFP_TEST_START_0000")
+ self.pg_enable_capture(self.pg_interfaces)
+
+ ipfix = VppCFLOW(test=self, datapath='ip6')
+ ipfix.add_vpp_config()
+
+ # template packet should arrive immediately
+ ipfix.verify_templates(count=1)
+ self.collector.get_capture(1)
+
+ ipfix.remove_vpp_config()
+
+ self.logger.info("FFP_TEST_FINISH_0000")
+
+ def test_L2onIP6(self):
+ """ L2 data on IP6 datapath"""
+ self.logger.info("FFP_TEST_START_0001")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, intf='pg6', layer='l2', datapath='ip6')
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+ self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
+ ip_ver='IPv6')
+ capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
+ {2: 'packets', 256: 56710},
+ ip_ver='v6')
+
+ # expected two templates and one cflow packet
+ self.collector.get_capture(2)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0001")
+
+ def test_L3onIP6(self):
+ """ L3 data on IP6 datapath"""
+ self.logger.info("FFP_TEST_START_0002")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, intf='pg6', layer='l3', datapath='ip6')
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+ self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
+ ip_ver='IPv6')
+ capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
+ {2: 'packets',
+ 27: 'src_ip', 28: 'dst_ip'},
+ ip_ver='v6')
+
+ # expected two templates and one cflow packet
+ self.collector.get_capture(2)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0002")
+
+ def test_L4onIP6(self):
+ """ L4 data on IP6 datapath"""
+ self.logger.info("FFP_TEST_START_0003")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, intf='pg6', layer='l4', datapath='ip6')
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+ self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
+ ip_ver='IPv6')
+ capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
+ {2: 'packets', 7: 'sport', 11: 'dport'},
+ ip_ver='v6')
+
+ # expected two templates and one cflow packet
+ self.collector.get_capture(2)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0003")
+
+ def test_0001(self):
+ """ no timers, one CFLOW packet, 9 Flows inside"""
+ self.logger.info("FFP_TEST_START_0001")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self)
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder)
+
+ self.create_stream(packets=9)
+ capture = self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[1])
+ self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
+ self.collector.get_capture(4)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0001")
+
+ def test_0002(self):
+ """ no timers, two CFLOW packets (mtu=256), 3 Flows in each"""
+ self.logger.info("FFP_TEST_START_0002")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self, mtu=256)
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ self.vapi.ipfix_flush()
+ templates = ipfix.verify_templates(ipfix_decoder)
+
+ self.create_stream(packets=6)
+ capture = self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ cflows = []
+ self.vapi.ipfix_flush()
+ cflows.append(self.wait_for_cflow_packet(self.collector,
+ templates[1]))
+ cflows.append(self.wait_for_cflow_packet(self.collector,
+ templates[1]))
+ self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
+ self.collector.get_capture(5)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0002")
+
+
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class DisableIPFIX(MethodHolder):
+ """Disable IPFIX"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(DisableIPFIX, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(DisableIPFIX, cls).tearDownClass()
+
+ def test_0001(self):
+ """ disable IPFIX after first packets"""
+ self.logger.info("FFP_TEST_START_0001")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self)
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder)
+
+ self.create_stream()
+ self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ self.wait_for_cflow_packet(self.collector, templates[1])
+ self.collector.get_capture(4)
+
+ # disable IPFIX
+ ipfix.disable_exporter()
+ self.pg_enable_capture([self.collector])
+
+ self.send_packets()
+
+ # make sure no one packet arrived in 1 minute
+ self.vapi.ipfix_flush()
+ self.wait_for_cflow_packet(self.collector, templates[1],
+ expected=False)
+ self.collector.get_capture(0)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0001")
+
+
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class ReenableIPFIX(MethodHolder):
+ """Re-enable IPFIX"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(ReenableIPFIX, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(ReenableIPFIX, cls).tearDownClass()
+
+ def test_0011(self):
+ """ disable IPFIX after first packets and re-enable after few packets
+ """
+ self.logger.info("FFP_TEST_START_0001")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self)
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder)
+
+ self.create_stream(packets=5)
+ self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ self.wait_for_cflow_packet(self.collector, templates[1])
+ self.collector.get_capture(4)
+
+ # disable IPFIX
+ ipfix.disable_exporter()
+ self.vapi.ipfix_flush()
+ self.pg_enable_capture([self.collector])
+
+ self.send_packets()
+
+ # make sure no one packet arrived in active timer span
+ self.vapi.ipfix_flush()
+ self.wait_for_cflow_packet(self.collector, templates[1],
+ expected=False)
+ self.collector.get_capture(0)
+ self.pg2.get_capture(5)
+
+ # enable IPFIX
+ ipfix.enable_exporter()
+
+ capture = self.collector.get_capture(4)
+ nr_templates = 0
+ nr_data = 0
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ nr_templates += 1
+ self.assertTrue(nr_templates, 3)
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Data):
+ nr_data += 1
+ self.assertTrue(nr_templates, 1)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0001")
+
+
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class DisableFP(MethodHolder):
+ """Disable Flowprobe feature"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(DisableFP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(DisableFP, cls).tearDownClass()
+
+ def test_0001(self):
+ """ disable flowprobe feature after first packets"""
+ self.logger.info("FFP_TEST_START_0001")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+ ipfix = VppCFLOW(test=self)
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder)
+
+ self.create_stream()
+ self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ self.wait_for_cflow_packet(self.collector, templates[1])
+ self.collector.get_capture(4)
+
+ # disable IPFIX
+ ipfix.disable_flowprobe_feature()
+ self.pg_enable_capture([self.collector])
+
+ self.send_packets()
+
+ # make sure no one packet arrived in active timer span
+ self.vapi.ipfix_flush()
+ self.wait_for_cflow_packet(self.collector, templates[1],
+ expected=False)
+ self.collector.get_capture(0)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0001")
+
+
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class ReenableFP(MethodHolder):
+ """Re-enable Flowprobe feature"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(ReenableFP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(ReenableFP, cls).tearDownClass()
+
+ def test_0001(self):
+ """ disable flowprobe feature after first packets and re-enable
+ after few packets """
+ self.logger.info("FFP_TEST_START_0001")
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(test=self)
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ self.vapi.ipfix_flush()
+ templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
+
+ self.create_stream()
+ self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ self.wait_for_cflow_packet(self.collector, templates[1], 5)
+ self.collector.get_capture(4)
+
+ # disable FPP feature
+ ipfix.disable_flowprobe_feature()
+ self.pg_enable_capture([self.collector])
+
+ self.send_packets()
+
+ # make sure no one packet arrived in active timer span
+ self.vapi.ipfix_flush()
+ self.wait_for_cflow_packet(self.collector, templates[1], 5,
+ expected=False)
+ self.collector.get_capture(0)
+
+ # enable FPP feature
+ ipfix.enable_flowprobe_feature()
+ self.vapi.ipfix_flush()
+ templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
+
+ self.send_packets()
+
+ # make sure the next packets (templates and data) we expect actually
+ # showed up
+ self.vapi.ipfix_flush()
+ self.wait_for_cflow_packet(self.collector, templates[1], 5)
+ self.collector.get_capture(4)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0001")
+
+
+if __name__ == '__main__':
+ unittest.main(testRunner=VppTestRunner)