#!/usr/bin/env python3 import unittest from framework import VppTestCase from asfframework import VppTestRunner from scapy.layers.inet import IP, TCP from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether from scapy.packet import Raw class TestMSSClamp(VppTestCase): """TCP MSS Clamping Test Case""" def setUp(self): super(TestMSSClamp, self).setUp() # create 2 pg interfaces self.create_pg_interfaces(range(2)) for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() def tearDown(self): for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() super(TestMSSClamp, self).tearDown() def verify_pkt(self, rx, expected_mss): # check that the MSS size equals the expected value # and the IP and TCP checksums are correct tcp = rx[TCP] tcp_csum = tcp.chksum del tcp.chksum ip_csum = 0 if rx.haslayer(IP): ip_csum = rx[IP].chksum del rx[IP].chksum opt = tcp.options self.assertEqual(opt[0][0], "MSS") self.assertEqual(opt[0][1], expected_mss) # recalculate checksums rx = rx.__class__(bytes(rx)) tcp = rx[TCP] self.assertEqual(tcp_csum, tcp.chksum) if rx.haslayer(IP): self.assertEqual(ip_csum, rx[IP].chksum) def send_and_verify_ip4(self, src_pg, dst_pg, mss, expected_mss): # IPv4 TCP packet with the requested MSS option. # from a host on src_pg to a host on dst_pg. p = ( Ether(dst=src_pg.local_mac, src=src_pg.remote_mac) / IP(src=src_pg.remote_ip4, dst=dst_pg.remote_ip4) / TCP( sport=1234, dport=1234, flags="S", options=[("MSS", (mss)), ("EOL", None)], ) / Raw("\xa5" * 100) ) rxs = self.send_and_expect(src_pg, p * 65, dst_pg) for rx in rxs: self.verify_pkt(rx, expected_mss) def send_and_verify_ip6(self, src_pg, dst_pg, mss, expected_mss): # # IPv6 TCP packet with the requested MSS option. # from a host on src_pg to a host on dst_pg. # p = ( Ether(dst=src_pg.local_mac, src=src_pg.remote_mac) / IPv6(src=src_pg.remote_ip6, dst=dst_pg.remote_ip6) / TCP( sport=1234, dport=1234, flags="S", options=[("MSS", (mss)), ("EOL", None)], ) / Raw("\xa5" * 100) ) rxs = self.send_and_expect(src_pg, p * 65, dst_pg) for rx in rxs: self.verify_pkt(rx, expected_mss) def test_tcp_mss_clamping_ip4_tx(self): """IP4 TCP MSS Clamping TX""" # enable the TCP MSS clamping feature to lower the MSS to 1424. self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=1424, ipv6_mss=0, ipv4_direction=3, ipv6_direction=0, ) # Verify that the feature is enabled. rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index) self.assertEqual(reply[0].ipv4_mss, 1424) self.assertEqual(reply[0].ipv4_direction, 3) # Send syn packets and verify that the MSS value is lowered. self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1424) # check the stats stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip4-out/clamped") self.assertEqual(sum(stats), 65) # Send syn packets with small enough MSS values and verify they are # unchanged. self.send_and_verify_ip4(self.pg0, self.pg1, 1400, 1400) # enable the the feature only in TX direction # and change the max MSS value self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=1420, ipv6_mss=0, ipv4_direction=2, ipv6_direction=0, ) # Send syn packets and verify that the MSS value is lowered. self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1420) # enable the the feature only in RX direction self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=1424, ipv6_mss=0, ipv4_direction=1, ipv6_direction=0, ) # Send the packets again and ensure they are unchanged. self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1460) # disable the feature self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=0, ipv4_direction=0, ipv6_direction=0, ) # Send the packets again and ensure they are unchanged. self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1460) def test_tcp_mss_clamping_ip4_rx(self): """IP4 TCP MSS Clamping RX""" # enable the TCP MSS clamping feature to lower the MSS to 1424. self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=1424, ipv6_mss=0, ipv4_direction=3, ipv6_direction=0, ) # Verify that the feature is enabled. rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index) self.assertEqual(reply[0].ipv4_mss, 1424) self.assertEqual(reply[0].ipv4_direction, 3) # Send syn packets and verify that the MSS value is lowered. self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1424) # check the stats stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip4-in/clamped") self.assertEqual(sum(stats), 65) # Send syn packets with small enough MSS values and verify they are # unchanged. self.send_and_verify_ip4(self.pg1, self.pg0, 1400, 1400) # enable the the feature only in RX direction # and change the max MSS value self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=1420, ipv6_mss=0, ipv4_direction=1, ipv6_direction=0, ) # Send syn packets and verify that the MSS value is lowered. self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1420) # enable the the feature only in TX direction self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=1424, ipv6_mss=0, ipv4_direction=2, ipv6_direction=0, ) # Send the packets again and ensure they are unchanged. self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1460) # disable the feature self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=0, ipv4_direction=0, ipv6_direction=0, ) # Send the packets again and ensure they are unchanged. self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1460) def test_tcp_mss_clamping_ip6_tx(self): """IP6 TCP MSS Clamping TX""" # enable the TCP MSS clamping feature to lower the MSS to 1424. self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=1424, ipv4_direction=0, ipv6_direction=3, ) # Verify that the feature is enabled. rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index) self.assertEqual(reply[0].ipv6_mss, 1424) self.assertEqual(reply[0].ipv6_direction, 3) # Send syn packets and verify that the MSS value is lowered. self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1424) # check the stats stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip6-out/clamped") self.assertEqual(sum(stats), 65) # Send syn packets with small enough MSS values and verify they are # unchanged. self.send_and_verify_ip6(self.pg0, self.pg1, 1400, 1400) # enable the the feature only in TX direction # and change the max MSS value self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=1420, ipv4_direction=0, ipv6_direction=2, ) # Send syn packets and verify that the MSS value is lowered. self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1420) # enable the the feature only in RX direction self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=1424, ipv4_direction=0, ipv6_direction=1, ) # Send the packets again and ensure they are unchanged. self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1460) # disable the feature self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=0, ipv4_direction=0, ipv6_direction=0, ) # Send the packets again and ensure they are unchanged. self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1460) def test_tcp_mss_clamping_ip6_rx(self): """IP6 TCP MSS Clamping RX""" # enable the TCP MSS clamping feature to lower the MSS to 1424. self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=1424, ipv4_direction=0, ipv6_direction=3, ) # Verify that the feature is enabled. rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index) self.assertEqual(reply[0].ipv6_mss, 1424) self.assertEqual(reply[0].ipv6_direction, 3) # Send syn packets and verify that the MSS value is lowered. self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1424) # check the stats stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip6-in/clamped") self.assertEqual(sum(stats), 65) # Send syn packets with small enough MSS values and verify they are # unchanged. self.send_and_verify_ip6(self.pg1, self.pg0, 1400, 1400) # enable the the feature only in RX direction # and change the max MSS value self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=1420, ipv4_direction=0, ipv6_direction=1, ) # Send syn packets and verify that the MSS value is lowered. self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1420) # enable the the feature only in TX direction self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=1424, ipv4_direction=0, ipv6_direction=2, ) # Send the packets again and ensure they are unchanged. self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1460) # disable the feature self.vapi.mss_clamp_enable_disable( self.pg1.sw_if_index, ipv4_mss=0, ipv6_mss=0, ipv4_direction=0, ipv6_direction=0, ) # Send the packets again and ensure they are unchanged. self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1460) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)