From bf55e9931ce203049385fbf55dde291ead556679 Mon Sep 17 00:00:00 2001 From: Neale Ranns Date: Thu, 4 Oct 2018 06:40:30 -0700 Subject: mss_clamp: TCP MSS clamping plugin Type: feature Configure TCP MSS clamping on an interface as follows: set interface tcp-mss-clamp [rx|tx] ip4 [enable|disable|rx|tx] ip4-mss ip6 [enable|disable|rx|tx] ip6-mss Change-Id: I45b04e50a0b70a33e14a9066f981c651292ebffb Signed-off-by: Neale Ranns Signed-off-by: Paul Vinciguerra Signed-off-by: Miklos Tirpak Signed-off-by: Matthew Smith --- src/plugins/mss_clamp/test/test_mss_clamp.py | 295 +++++++++++++++++++++++++++ 1 file changed, 295 insertions(+) create mode 100644 src/plugins/mss_clamp/test/test_mss_clamp.py (limited to 'src/plugins/mss_clamp/test/test_mss_clamp.py') diff --git a/src/plugins/mss_clamp/test/test_mss_clamp.py b/src/plugins/mss_clamp/test/test_mss_clamp.py new file mode 100644 index 00000000000..23495b6050b --- /dev/null +++ b/src/plugins/mss_clamp/test/test_mss_clamp.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 + +import unittest + +from framework import VppTestCase, VppTestRunner + +from scapy.layers.inet import IP, TCP +from scapy.layers.inet6 import IPv6 +from scapy.layers.l2 import Ether +from scapy.packet import Raw + + +class TestMSSClamp(VppTestCase): + """ TCP MSS Clamping Test Case """ + + def setUp(self): + super(TestMSSClamp, self).setUp() + + # create 2 pg interfaces + self.create_pg_interfaces(range(2)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + + def tearDown(self): + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + i.admin_down() + super(TestMSSClamp, self).tearDown() + + def verify_pkt(self, rx, expected_mss): + # check that the MSS size equals the expected value + # and the IP and TCP checksums are correct + tcp = rx[TCP] + tcp_csum = tcp.chksum + del tcp.chksum + ip_csum = 0 + if (rx.haslayer(IP)): + ip_csum = rx[IP].chksum + del rx[IP].chksum + + opt = tcp.options + self.assertEqual(opt[0][0], 'MSS') + self.assertEqual(opt[0][1], expected_mss) + # recalculate checksums + rx = rx.__class__(bytes(rx)) + tcp = rx[TCP] + self.assertEqual(tcp_csum, tcp.chksum) + if (rx.haslayer(IP)): + self.assertEqual(ip_csum, rx[IP].chksum) + + def send_and_verify_ip4(self, src_pg, dst_pg, mss, expected_mss): + # IPv4 TCP packet with the requested MSS option. + # from a host on src_pg to a host on dst_pg. + p = (Ether(dst=src_pg.local_mac, + src=src_pg.remote_mac) / + IP(src=src_pg.remote_ip4, + dst=dst_pg.remote_ip4) / + TCP(sport=1234, dport=1234, + flags="S", + options=[('MSS', (mss)), ('EOL', None)]) / + Raw('\xa5' * 100)) + + rxs = self.send_and_expect(src_pg, p * 65, dst_pg) + + for rx in rxs: + self.verify_pkt(rx, expected_mss) + + def send_and_verify_ip6(self, src_pg, dst_pg, mss, expected_mss): + # + # IPv6 TCP packet with the requested MSS option. + # from a host on src_pg to a host on dst_pg. + # + p = (Ether(dst=src_pg.local_mac, + src=src_pg.remote_mac) / + IPv6(src=src_pg.remote_ip6, + dst=dst_pg.remote_ip6) / + TCP(sport=1234, dport=1234, + flags="S", + options=[('MSS', (mss)), ('EOL', None)]) / + Raw('\xa5' * 100)) + + rxs = self.send_and_expect(src_pg, p * 65, dst_pg) + + for rx in rxs: + self.verify_pkt(rx, expected_mss) + + def test_tcp_mss_clamping_ip4_tx(self): + """ IP4 TCP MSS Clamping TX """ + + # enable the TCP MSS clamping feature to lower the MSS to 1424. + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=1424, ipv6_mss=0, + ipv4_direction=3, ipv6_direction=0) + + # Verify that the feature is enabled. + rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index) + self.assertEqual(reply[0].ipv4_mss, 1424) + self.assertEqual(reply[0].ipv4_direction, 3) + + # Send syn packets and verify that the MSS value is lowered. + self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1424) + + # check the stats + stats = self.statistics.get_counter( + '/err/tcp-mss-clamping-ip4-out/clamped') + self.assertEqual(sum(stats), 65) + + # Send syn packets with small enough MSS values and verify they are + # unchanged. + self.send_and_verify_ip4(self.pg0, self.pg1, 1400, 1400) + + # enable the the feature only in TX direction + # and change the max MSS value + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=1420, ipv6_mss=0, + ipv4_direction=2, ipv6_direction=0) + + # Send syn packets and verify that the MSS value is lowered. + self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1420) + + # enable the the feature only in RX direction + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=1424, ipv6_mss=0, + ipv4_direction=1, ipv6_direction=0) + + # Send the packets again and ensure they are unchanged. + self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1460) + + # disable the feature + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=0, + ipv4_direction=0, ipv6_direction=0) + + # Send the packets again and ensure they are unchanged. + self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1460) + + def test_tcp_mss_clamping_ip4_rx(self): + """ IP4 TCP MSS Clamping RX """ + + # enable the TCP MSS clamping feature to lower the MSS to 1424. + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=1424, ipv6_mss=0, + ipv4_direction=3, ipv6_direction=0) + + # Verify that the feature is enabled. + rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index) + self.assertEqual(reply[0].ipv4_mss, 1424) + self.assertEqual(reply[0].ipv4_direction, 3) + + # Send syn packets and verify that the MSS value is lowered. + self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1424) + + # check the stats + stats = self.statistics.get_counter( + '/err/tcp-mss-clamping-ip4-in/clamped') + self.assertEqual(sum(stats), 65) + + # Send syn packets with small enough MSS values and verify they are + # unchanged. + self.send_and_verify_ip4(self.pg1, self.pg0, 1400, 1400) + + # enable the the feature only in RX direction + # and change the max MSS value + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=1420, ipv6_mss=0, + ipv4_direction=1, ipv6_direction=0) + + # Send syn packets and verify that the MSS value is lowered. + self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1420) + + # enable the the feature only in TX direction + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=1424, ipv6_mss=0, + ipv4_direction=2, ipv6_direction=0) + + # Send the packets again and ensure they are unchanged. + self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1460) + + # disable the feature + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=0, + ipv4_direction=0, ipv6_direction=0) + + # Send the packets again and ensure they are unchanged. + self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1460) + + def test_tcp_mss_clamping_ip6_tx(self): + """ IP6 TCP MSS Clamping TX """ + + # enable the TCP MSS clamping feature to lower the MSS to 1424. + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=1424, + ipv4_direction=0, ipv6_direction=3) + + # Verify that the feature is enabled. + rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index) + self.assertEqual(reply[0].ipv6_mss, 1424) + self.assertEqual(reply[0].ipv6_direction, 3) + + # Send syn packets and verify that the MSS value is lowered. + self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1424) + + # check the stats + stats = self.statistics.get_counter( + '/err/tcp-mss-clamping-ip6-out/clamped') + self.assertEqual(sum(stats), 65) + + # Send syn packets with small enough MSS values and verify they are + # unchanged. + self.send_and_verify_ip6(self.pg0, self.pg1, 1400, 1400) + + # enable the the feature only in TX direction + # and change the max MSS value + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=1420, + ipv4_direction=0, ipv6_direction=2) + + # Send syn packets and verify that the MSS value is lowered. + self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1420) + + # enable the the feature only in RX direction + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=1424, + ipv4_direction=0, ipv6_direction=1) + + # Send the packets again and ensure they are unchanged. + self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1460) + + # disable the feature + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=0, + ipv4_direction=0, ipv6_direction=0) + + # Send the packets again and ensure they are unchanged. + self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1460) + + def test_tcp_mss_clamping_ip6_rx(self): + """ IP6 TCP MSS Clamping RX """ + + # enable the TCP MSS clamping feature to lower the MSS to 1424. + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=1424, + ipv4_direction=0, ipv6_direction=3) + + # Verify that the feature is enabled. + rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index) + self.assertEqual(reply[0].ipv6_mss, 1424) + self.assertEqual(reply[0].ipv6_direction, 3) + + # Send syn packets and verify that the MSS value is lowered. + self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1424) + + # check the stats + stats = self.statistics.get_counter( + '/err/tcp-mss-clamping-ip6-in/clamped') + self.assertEqual(sum(stats), 65) + + # Send syn packets with small enough MSS values and verify they are + # unchanged. + self.send_and_verify_ip6(self.pg1, self.pg0, 1400, 1400) + + # enable the the feature only in RX direction + # and change the max MSS value + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=1420, + ipv4_direction=0, ipv6_direction=1) + + # Send syn packets and verify that the MSS value is lowered. + self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1420) + + # enable the the feature only in TX direction + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=1424, + ipv4_direction=0, ipv6_direction=2) + + # Send the packets again and ensure they are unchanged. + self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1460) + + # disable the feature + self.vapi.mss_clamp_enable_disable(self.pg1.sw_if_index, + ipv4_mss=0, ipv6_mss=0, + ipv4_direction=0, ipv6_direction=0) + + # Send the packets again and ensure they are unchanged. + self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1460) + + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg