summaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorPim van Pelt <pim@ipng.nl>2024-10-06 17:49:00 +0200
committerBenoit Ganne <bganne@cisco.com>2025-01-20 17:21:43 +0000
commite40f8a90bb0c39986c198fca8ad9b0b3c1658401 (patch)
tree6a6e55f5bff8a650f2171b60fdcee207e39f2f5b /test
parent26cb7184e1b2050ee86152be6fb6363cd7f0cc72 (diff)
sflow: initial checkin
This is an sFlow dataplane plugin that can sample 1-in-N packets from device-input, copying them to a FIFO queue and servicing that queue from a main process which formats them as Netlink PSAMPLEs, to be picked up by a popular sidecar agent called host-sflow. Type: feature Change-Id: Ic03456472e53309678f182dc8f74d3c81fb619e6 Signed-off-by: neil.mckee@inmon.com Signed-off-by: pim@ipng.nl
Diffstat (limited to 'test')
-rw-r--r--test/test_sflow.py212
1 files changed, 212 insertions, 0 deletions
diff --git a/test/test_sflow.py b/test/test_sflow.py
new file mode 100644
index 00000000000..d16c0e6a804
--- /dev/null
+++ b/test/test_sflow.py
@@ -0,0 +1,212 @@
+#!/usr/bin/env python3
+
+import unittest
+from framework import VppTestCase
+from asfframework import VppTestRunner
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.layers.inet import IP, UDP
+from random import randint
+import re # for finding counters in "sh errors" output
+
+
+class SFlowTestCase(VppTestCase):
+ """sFlow test case"""
+
+ @classmethod
+ def setUpClass(self):
+ super(SFlowTestCase, self).setUpClass()
+
+ @classmethod
+ def teadDownClass(cls):
+ super(SFlowTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ self.create_pg_interfaces(range(2)) # create pg0 and pg1
+ for i in self.pg_interfaces:
+ i.admin_up() # put the interface up
+ i.config_ip4() # configure IPv4 address on the interface
+ i.resolve_arp() # resolve ARP, so that we know VPP MAC
+
+ def tearDown(self):
+ for i in self.pg_interfaces:
+ i.admin_down()
+ i.unconfig()
+ i.set_table_ip4(0)
+ i.set_table_ip6(0)
+
+ def is_hw_interface_in_dump(self, dump, hw_if_index):
+ for i in dump:
+ if i.hw_if_index == hw_if_index:
+ return True
+ else:
+ return False
+
+ def enable_sflow_via_api(self):
+ ## TEST: Enable one interface
+ ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True)
+ self.assertEqual(ret.retval, 0)
+
+ ## TEST: interface dump all
+ ret = self.vapi.sflow_interface_dump()
+ self.assertTrue(self.is_hw_interface_in_dump(ret, 1))
+
+ ## TEST: Disable one interface
+ ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=False)
+ self.assertEqual(ret.retval, 0)
+
+ ## TEST: interface dump all after enable + disable
+ ret = self.vapi.sflow_interface_dump()
+ self.assertEqual(len(ret), 0)
+
+ ## TEST: Enable both interfaces
+ ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True)
+ self.assertEqual(ret.retval, 0)
+ ret = self.vapi.sflow_enable_disable(hw_if_index=2, enable_disable=True)
+ self.assertEqual(ret.retval, 0)
+
+ ## TEST: interface dump all
+ ret = self.vapi.sflow_interface_dump()
+ self.assertTrue(self.is_hw_interface_in_dump(ret, 1))
+ self.assertTrue(self.is_hw_interface_in_dump(ret, 2))
+
+ ## TEST: the default sampling rate
+ ret = self.vapi.sflow_sampling_rate_get()
+ self.assert_equal(ret.sampling_N, 10000)
+
+ ## TEST: sflow_sampling_rate_set()
+ self.vapi.sflow_sampling_rate_set(sampling_N=1)
+ ret = self.vapi.sflow_sampling_rate_get()
+ self.assert_equal(ret.sampling_N, 1)
+
+ ## TEST: the default polling interval
+ ret = self.vapi.sflow_polling_interval_get()
+ self.assert_equal(ret.polling_S, 20)
+
+ ## TEST: sflow_polling_interval_set()
+ self.vapi.sflow_polling_interval_set(polling_S=10)
+ ret = self.vapi.sflow_polling_interval_get()
+ self.assert_equal(ret.polling_S, 10)
+
+ ## TEST: the default header bytes
+ ret = self.vapi.sflow_header_bytes_get()
+ self.assert_equal(ret.header_B, 128)
+
+ ## TEST: sflow_header_bytes_set()
+ self.vapi.sflow_header_bytes_set(header_B=96)
+ ret = self.vapi.sflow_header_bytes_get()
+ self.assert_equal(ret.header_B, 96)
+
+ def create_stream(self, src_if, dst_if, count):
+ packets = []
+ for i in range(count):
+ # create packet info stored in the test case instance
+ info = self.create_packet_info(src_if, dst_if)
+ # convert the info into packet payload
+ payload = self.info_to_payload(info)
+ # create the packet itself
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / UDP(sport=randint(1000, 2000), dport=5678)
+ / Raw(payload)
+ )
+ # store a copy of the packet in the packet info
+ info.data = p.copy()
+ # append the packet to the list
+ packets.append(p)
+ # return the created packet list
+ return packets
+
+ def verify_capture(self, src_if, dst_if, capture):
+ packet_info = None
+ for packet in capture:
+ try:
+ ip = packet[IP]
+ udp = packet[UDP]
+ # convert the payload to packet info object
+ payload_info = self.payload_to_info(packet[Raw])
+ # make sure the indexes match
+ self.assert_equal(
+ payload_info.src, src_if.sw_if_index, "source sw_if_index"
+ )
+ self.assert_equal(
+ payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
+ )
+ packet_info = self.get_next_packet_info_for_interface2(
+ src_if.sw_if_index, dst_if.sw_if_index, packet_info
+ )
+ # make sure we didn't run out of saved packets
+ self.assertIsNotNone(packet_info)
+ self.assert_equal(
+ payload_info.index, packet_info.index, "packet info index"
+ )
+ saved_packet = packet_info.data # fetch the saved packet
+ # assert the values match
+ self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
+ self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
+ except:
+ self.logger.error("Unexpected or invalid packet:", packet)
+ raise
+ remaining_packet = self.get_next_packet_info_for_interface2(
+ src_if.sw_if_index, dst_if.sw_if_index, packet_info
+ )
+ self.assertIsNone(
+ remaining_packet,
+ "Interface %s: Packet expected from interface "
+ "%s didn't arrive" % (dst_if.name, src_if.name),
+ )
+
+ def get_sflow_counter(self, counter):
+ counters = self.vapi.cli("sh errors").split("\n")
+ for i in range(1, len(counters) - 1):
+ results = counters[i].split()
+ if results[1] == "sflow":
+ if re.search(counter, counters[i]) is not None:
+ return int(results[0])
+ return None
+
+ def verify_sflow(self, count):
+ ctr_processed = "sflow packets processed"
+ ctr_sampled = "sflow packets sampled"
+ ctr_dropped = "sflow packets dropped"
+ ctr_ps_sent = "sflow PSAMPLE sent"
+ ctr_ps_fail = "sflow PSAMPLE send failed"
+ processed = self.get_sflow_counter(ctr_processed)
+ sampled = self.get_sflow_counter(ctr_sampled)
+ dropped = self.get_sflow_counter(ctr_dropped)
+ ps_sent = self.get_sflow_counter(ctr_ps_sent)
+ ps_fail = self.get_sflow_counter(ctr_ps_fail)
+ self.assert_equal(processed, count, ctr_processed)
+ self.assert_equal(sampled, count, ctr_sampled)
+ self.assert_equal(dropped, None, ctr_dropped)
+ # TODO decide how to warn if PSAMPLE is not working
+ # It requires a prior "sudo modprobe psample", but
+ # that should probably be done at system boot time
+ # or maybe in a systemctl startup script, so we
+ # should only warn here.
+ self.logger.info(ctr_ps_sent + "=" + str(ps_sent))
+ self.logger.info(ctr_ps_fail + "=" + str(ps_fail))
+
+ def test_basic(self):
+ self.enable_sflow_via_api()
+ count = 7
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, count)
+ # add the stream to the source interface
+ self.pg0.add_stream(packets)
+ # enable capture on both interfaces
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # get capture - the proper count of packets was saved by
+ # create_packet_info() based on dst_if parameter
+ capture = self.pg1.get_capture()
+ # assert nothing captured on pg0 (always do this last, so that
+ # some time has already passed since pg_start())
+ self.pg0.assert_nothing_captured()
+ # verify capture
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify sflow counters
+ self.verify_sflow(count)