diff options
author | 2024-10-06 17:49:00 +0200 | |
---|---|---|
committer | 2025-01-20 17:21:43 +0000 | |
commit | e40f8a90bb0c39986c198fca8ad9b0b3c1658401 (patch) | |
tree | 6a6e55f5bff8a650f2171b60fdcee207e39f2f5b /test | |
parent | 26cb7184e1b2050ee86152be6fb6363cd7f0cc72 (diff) |
sflow: initial checkin
This is an sFlow dataplane plugin that can sample
1-in-N packets from device-input, copying them to
a FIFO queue and servicing that queue from a main
process which formats them as Netlink PSAMPLEs,
to be picked up by a popular sidecar agent called
host-sflow.
Type: feature
Change-Id: Ic03456472e53309678f182dc8f74d3c81fb619e6
Signed-off-by: neil.mckee@inmon.com
Signed-off-by: pim@ipng.nl
Diffstat (limited to 'test')
-rw-r--r-- | test/test_sflow.py | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/test/test_sflow.py b/test/test_sflow.py new file mode 100644 index 00000000000..d16c0e6a804 --- /dev/null +++ b/test/test_sflow.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 + +import unittest +from framework import VppTestCase +from asfframework import VppTestRunner +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.layers.inet import IP, UDP +from random import randint +import re # for finding counters in "sh errors" output + + +class SFlowTestCase(VppTestCase): + """sFlow test case""" + + @classmethod + def setUpClass(self): + super(SFlowTestCase, self).setUpClass() + + @classmethod + def teadDownClass(cls): + super(SFlowTestCase, cls).tearDownClass() + + def setUp(self): + self.create_pg_interfaces(range(2)) # create pg0 and pg1 + for i in self.pg_interfaces: + i.admin_up() # put the interface up + i.config_ip4() # configure IPv4 address on the interface + i.resolve_arp() # resolve ARP, so that we know VPP MAC + + def tearDown(self): + for i in self.pg_interfaces: + i.admin_down() + i.unconfig() + i.set_table_ip4(0) + i.set_table_ip6(0) + + def is_hw_interface_in_dump(self, dump, hw_if_index): + for i in dump: + if i.hw_if_index == hw_if_index: + return True + else: + return False + + def enable_sflow_via_api(self): + ## TEST: Enable one interface + ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True) + self.assertEqual(ret.retval, 0) + + ## TEST: interface dump all + ret = self.vapi.sflow_interface_dump() + self.assertTrue(self.is_hw_interface_in_dump(ret, 1)) + + ## TEST: Disable one interface + ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=False) + self.assertEqual(ret.retval, 0) + + ## TEST: interface dump all after enable + disable + ret = self.vapi.sflow_interface_dump() + self.assertEqual(len(ret), 0) + + ## TEST: Enable both interfaces + ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True) + self.assertEqual(ret.retval, 0) + ret = self.vapi.sflow_enable_disable(hw_if_index=2, enable_disable=True) + self.assertEqual(ret.retval, 0) + + ## TEST: interface dump all + ret = self.vapi.sflow_interface_dump() + self.assertTrue(self.is_hw_interface_in_dump(ret, 1)) + self.assertTrue(self.is_hw_interface_in_dump(ret, 2)) + + ## TEST: the default sampling rate + ret = self.vapi.sflow_sampling_rate_get() + self.assert_equal(ret.sampling_N, 10000) + + ## TEST: sflow_sampling_rate_set() + self.vapi.sflow_sampling_rate_set(sampling_N=1) + ret = self.vapi.sflow_sampling_rate_get() + self.assert_equal(ret.sampling_N, 1) + + ## TEST: the default polling interval + ret = self.vapi.sflow_polling_interval_get() + self.assert_equal(ret.polling_S, 20) + + ## TEST: sflow_polling_interval_set() + self.vapi.sflow_polling_interval_set(polling_S=10) + ret = self.vapi.sflow_polling_interval_get() + self.assert_equal(ret.polling_S, 10) + + ## TEST: the default header bytes + ret = self.vapi.sflow_header_bytes_get() + self.assert_equal(ret.header_B, 128) + + ## TEST: sflow_header_bytes_set() + self.vapi.sflow_header_bytes_set(header_B=96) + ret = self.vapi.sflow_header_bytes_get() + self.assert_equal(ret.header_B, 96) + + def create_stream(self, src_if, dst_if, count): + packets = [] + for i in range(count): + # create packet info stored in the test case instance + info = self.create_packet_info(src_if, dst_if) + # convert the info into packet payload + payload = self.info_to_payload(info) + # create the packet itself + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=randint(1000, 2000), dport=5678) + / Raw(payload) + ) + # store a copy of the packet in the packet info + info.data = p.copy() + # append the packet to the list + packets.append(p) + # return the created packet list + return packets + + def verify_capture(self, src_if, dst_if, capture): + packet_info = None + for packet in capture: + try: + ip = packet[IP] + udp = packet[UDP] + # convert the payload to packet info object + payload_info = self.payload_to_info(packet[Raw]) + # make sure the indexes match + self.assert_equal( + payload_info.src, src_if.sw_if_index, "source sw_if_index" + ) + self.assert_equal( + payload_info.dst, dst_if.sw_if_index, "destination sw_if_index" + ) + packet_info = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + # make sure we didn't run out of saved packets + self.assertIsNotNone(packet_info) + self.assert_equal( + payload_info.index, packet_info.index, "packet info index" + ) + saved_packet = packet_info.data # fetch the saved packet + # assert the values match + self.assert_equal(ip.src, saved_packet[IP].src, "IP source address") + self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port") + except: + self.logger.error("Unexpected or invalid packet:", packet) + raise + remaining_packet = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + self.assertIsNone( + remaining_packet, + "Interface %s: Packet expected from interface " + "%s didn't arrive" % (dst_if.name, src_if.name), + ) + + def get_sflow_counter(self, counter): + counters = self.vapi.cli("sh errors").split("\n") + for i in range(1, len(counters) - 1): + results = counters[i].split() + if results[1] == "sflow": + if re.search(counter, counters[i]) is not None: + return int(results[0]) + return None + + def verify_sflow(self, count): + ctr_processed = "sflow packets processed" + ctr_sampled = "sflow packets sampled" + ctr_dropped = "sflow packets dropped" + ctr_ps_sent = "sflow PSAMPLE sent" + ctr_ps_fail = "sflow PSAMPLE send failed" + processed = self.get_sflow_counter(ctr_processed) + sampled = self.get_sflow_counter(ctr_sampled) + dropped = self.get_sflow_counter(ctr_dropped) + ps_sent = self.get_sflow_counter(ctr_ps_sent) + ps_fail = self.get_sflow_counter(ctr_ps_fail) + self.assert_equal(processed, count, ctr_processed) + self.assert_equal(sampled, count, ctr_sampled) + self.assert_equal(dropped, None, ctr_dropped) + # TODO decide how to warn if PSAMPLE is not working + # It requires a prior "sudo modprobe psample", but + # that should probably be done at system boot time + # or maybe in a systemctl startup script, so we + # should only warn here. + self.logger.info(ctr_ps_sent + "=" + str(ps_sent)) + self.logger.info(ctr_ps_fail + "=" + str(ps_fail)) + + def test_basic(self): + self.enable_sflow_via_api() + count = 7 + # create the packet stream + packets = self.create_stream(self.pg0, self.pg1, count) + # add the stream to the source interface + self.pg0.add_stream(packets) + # enable capture on both interfaces + self.pg0.enable_capture() + self.pg1.enable_capture() + # start the packet generator + self.pg_start() + # get capture - the proper count of packets was saved by + # create_packet_info() based on dst_if parameter + capture = self.pg1.get_capture() + # assert nothing captured on pg0 (always do this last, so that + # some time has already passed since pg_start()) + self.pg0.assert_nothing_captured() + # verify capture + self.verify_capture(self.pg0, self.pg1, capture) + # verify sflow counters + self.verify_sflow(count) |