summaryrefslogtreecommitdiffstats
path: root/src/vcl/vppcom.h
AgeCommit message (Expand)AuthorFilesLines
2021-06-14vcl: improve shutdown()liuyacan1-3/+2
2021-06-05vcl: epoll error handling fixes and improvementsFlorin Coras1-1/+2
2021-05-25srtp: basic implementation based on libsrtp2Florin Coras1-0/+1
2021-05-12session: support half-close connectionliuyacan1-0/+1
2021-05-01vcl: attr api to set endpt ext configFlorin Coras1-0/+1
2021-04-14session tcp vcl: api to update connection attributesFlorin Coras1-1/+2
2021-02-22vcl: support sockopt of SO_REUSEPORT and SO_DOMAINwanghanlin1-0/+1
2021-02-09tls: dtls initial implementationFlorin Coras1-23/+24
2021-01-28vcl: support set/get vrf attributeFlorin Coras1-0/+2
2021-01-07vcl session: switch to generic cert key apisFlorin Coras1-4/+11
2020-11-09vcl: remove support for shm api connectionsFlorin Coras1-1/+1
2020-09-29vcl svm: provide apps access to fifo chunksFlorin Coras1-4/+4
2020-06-15vcl: move helper functions out of header fileFlorin Coras1-97/+5
2020-05-21vcl: support connected udp listensFlorin Coras1-0/+1
2020-05-15vcl: remove udpc transportFlorin Coras1-5/+1
2020-05-04vcl: allow vcl worker index to be set by applicationsPivo1-0/+5
2020-03-25session: api to add new transport typesFlorin Coras1-4/+0
2019-10-21vcl: add api to set lcl ipFlorin Coras1-0/+1
2019-08-30vcl: allow non-blocking connectsFlorin Coras1-0/+2
2019-07-19session: Use parent_handle instead of transport_optsNathan Skrzypczak1-1/+1
2019-07-18vcl: add QUIC supportNathan Skrzypczak1-1/+18
2019-07-08vcl: add worker unregister apiFlorin Coras1-0/+5
2019-03-15vcl: cleanup debug messagesFlorin Coras1-1/+1
2019-01-26vcl: add api to expose worker message queues epfdFlorin Coras1-0/+8
2019-01-18vcl: move forking logic to vlsFlorin Coras1-2/+0
2019-01-08vcl/ldp: select cleanup/improvementsFlorin Coras1-4/+5
2019-01-07vcl/ldp: add locked sessions shim layerFlorin Coras1-2/+4
2019-01-05vcl/session: add api for changing session app workerFlorin Coras1-1/+4
2019-01-04Add TLS support for VCLPing Yu1-2/+19
2019-01-01vcl/ldp: add write msg function and fine tuningFlorin Coras1-0/+2
2018-11-29vcl: basic support for apps that forkFlorin Coras1-0/+1
2018-11-29ldp: basic multiworker supportFlorin Coras1-0/+5
2018-11-27ldp: sid to fd mapper for apps with multiple workersFlorin Coras1-0/+1
2018-11-14vcl/ldp: fix pollFlorin Coras1-1/+1
2018-09-22vcl: remove vcl_eventFlorin Coras1-5/+0
2018-09-12vcl: add apis that expos fifo as bufferFlorin Coras1-0/+15
2018-08-30vcl: add support for multi-worker appsFlorin Coras1-14/+24
2018-08-10vcl: support for eventfd mq signalingFlorin Coras1-11/+13
2018-07-27vcl: use events for epoll/select/read/writeFlorin Coras1-0/+1
2018-06-19VCL: refactor async & spinlocksDave Wallace1-75/+5
2018-04-12VCL IOEvent external API callbackKeith Burns (alagalah)1-0/+35
2018-03-09VCL API for external callback for listener/connect eventKeith Burns (alagalah)1-0/+41
2018-02-19VCL: config api prefix using env var.Dave Wallace1-0/+1
2018-02-08VCL: remove vrf from vppcom api.Dave Wallace1-4/+1
2018-02-02Clean up for vcl.am, making vppcom.h C++ awareKeith Burns (alagalah)1-0/+13
2018-01-17LD_PRELOAD: stateless LDPDave Wallace1-4/+60
2017-11-25VCL: improve debug outputDave Wallace1-0/+5
2017-11-21VCL: close-on-empty state transition refactoring.Dave Wallace1-0/+5
2017-11-19VCL-LDPRELOAD: Fix CID179207Dave Wallace1-0/+4
2017-11-14VCL-LDPRELOAD: add sendfile/sendfile64 implementation.Dave Wallace1-2/+2
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, UDPerror from scapy.layers.l2 import Ether from util import ppp class TestDET44(VppTestCase): """ Deterministic NAT Test Cases """ @classmethod def setUpClass(cls): super(TestDET44, cls).setUpClass() cls.vapi.cli("set log class det44 level debug") cls.tcp_port_in = 6303 cls.tcp_external_port = 6303 cls.udp_port_in = 6304 cls.udp_external_port = 6304 cls.icmp_id_in = 6305 cls.nat_addr = '10.0.0.3' cls.create_pg_interfaces(range(3)) cls.interfaces = list(cls.pg_interfaces) for i in cls.interfaces: i.admin_up() i.config_ip4() i.resolve_arp() cls.pg0.generate_remote_hosts(2) cls.pg0.configure_ipv4_neighbors() @classmethod def tearDownClass(cls): super(TestDET44, cls).tearDownClass() def setUp(self): super(TestDET44, self).setUp() self.vapi.det44_plugin_enable_disable(enable=1) def tearDown(self): super(TestDET44, self).tearDown() if not self.vpp_dead: self.vapi.det44_plugin_enable_disable(enable=0) def show_commands_at_teardown(self): self.logger.info(self.vapi.cli("show det44 interfaces")) self.logger.info(self.vapi.cli("show det44 timeouts")) self.logger.info(self.vapi.cli("show det44 mappings")) self.logger.info(self.vapi.cli("show det44 sessions")) def verify_capture_in(self, capture, in_if): """ Verify captured packets on inside network :param capture: Captured packets :param in_if: Inside interface """ fired = False for packet in capture: try: self.assert_packet_checksums_valid(packet) self.assertEqual(packet[IP].dst, in_if.remote_ip4) if packet.haslayer(TCP): self.assertEqual(packet[TCP].dport, self.tcp_port_in) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].dport, self.udp_port_in) else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: fired = True self.logger.error(ppp("Unexpected or invalid packet " "(inside network):", packet)) if fired: raise def verify_ipfix_max_entries_per_user(self, data, limit, src_addr): """ Verify IPFIX maximum entries per user exceeded event :param data: Decoded IPFIX data records :param limit: Number of maximum entries per user :param src_addr: IPv4 source address """ self.assertEqual(1, len(data)) record = data[0] # natEvent self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 3), record[466]) # maxEntriesPerUser self.assertEqual(struct.pack("I", limit), record[473]) # sourceIPv4Address self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8]) def initiate_tcp_session(self, in_if, out_if): """ Initiates TCP session 3 WAY HAND SHAKE :param in_if: Inside interface :param out_if: Outside interface """ # SYN packet in->out p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = out_if.get_capture(1) p = capture[0] self.tcp_port_out = p[TCP].sport # SYN + ACK packet out->in p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / IP(src=out_if.remote_ip4, dst=self.nat_addr) / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")) out_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() in_if.get_capture(1) # ACK packet in->out p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() out_if.get_capture(1) def create_stream_in(self, in_if, out_if, ttl=64): """ Create packet stream for inside network :param in_if: Inside interface :param out_if: Outside interface :param ttl: TTL of generated packets """ pkts = [] # TCP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) pkts.append(p) # UDP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / UDP(sport=self.udp_port_in, dport=self.udp_external_port)) pkts.append(p) # ICMP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / ICMP(id=self.icmp_id_in, type='echo-request')) pkts.append(p) return pkts def create_stream_out(self, out_if, dst_ip=None, ttl=64): """ Create packet stream for outside network :param out_if: Outside interface :param dst_ip: Destination IP address (Default use global NAT address) :param ttl: TTL of generated packets """ if dst_ip is None: dst_ip = self.nat_addr pkts = [] # TCP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)) pkts.append(p) # UDP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(dport=self.udp_port_out, sport=self.udp_external_port)) pkts.append(p) # ICMP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=self.icmp_external_id, type='echo-reply')) pkts.append(p) return pkts def verify_capture_out(self, capture, nat_ip=None): """ Verify captured packets on outside network :param capture: Captured packets :param nat_ip: Translated IP address (Default use global NAT address) :param same_port: Source port number is not translated (Default False) """ if nat_ip is None: nat_ip = self.nat_addr for packet in capture: try: self.assertEqual(packet[IP].src, nat_ip) if packet.haslayer(TCP): self.tcp_port_out = packet[TCP].sport elif packet.haslayer(UDP): self.udp_port_out = packet[UDP].sport else: self.icmp_external_id = packet[ICMP].id except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise def test_deterministic_mode(self): """ NAT plugin run deterministic mode """ in_addr = '172.16.255.0' out_addr = '172.17.255.50' in_addr_t = '172.16.255.20' in_plen = 24 out_plen = 32 self.vapi.det44_add_del_map(is_add=1, in_addr=in_addr, in_plen=in_plen, out_addr=out_addr, out_plen=out_plen) rep1 = self.vapi.det44_forward(in_addr_t) self.assertEqual(str(rep1.out_addr), out_addr) rep2 = self.vapi.det44_reverse(rep1.out_port_hi, out_addr) self.assertEqual(str(rep2.in_addr), in_addr_t) deterministic_mappings = self.vapi.det44_map_dump() self.assertEqual(len(deterministic_mappings), 1) dsm = deterministic_mappings[0] self.assertEqual(in_addr, str(dsm.in_addr)) self.assertEqual(in_plen, dsm.in_plen) self.assertEqual(out_addr, str(dsm.out_addr)) self.assertEqual(out_plen, dsm.out_plen) def test_set_timeouts(self): """ Set deterministic NAT timeouts """ timeouts_before = self.vapi.det44_get_timeouts() self.vapi.det44_set_timeouts( udp=timeouts_before.udp + 10, tcp_established=timeouts_before.tcp_established + 10, tcp_transitory=timeouts_before.tcp_transitory + 10, icmp=timeouts_before.icmp + 10) timeouts_after = self.vapi.det44_get_timeouts() self.assertNotEqual(timeouts_before.udp, timeouts_after.udp) self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp) self.assertNotEqual(timeouts_before.tcp_established, timeouts_after.tcp_established) self.assertNotEqual(timeouts_before.tcp_transitory, timeouts_after.tcp_transitory) def test_in(self): """ DET44 translation test (TCP, UDP, ICMP) """ nat_ip = "10.0.0.10" self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, in_plen=32, out_addr=socket.inet_aton(nat_ip), out_plen=32) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0) # in2out pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture, nat_ip) # out2in pkts = self.create_stream_out(self.pg1, nat_ip) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) # session dump test sessions = self.vapi.det44_session_dump(self.pg0.remote_ip4) self.assertEqual(len(sessions), 3) # TCP session s = sessions[0] self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) self.assertEqual(s.in_port, self.tcp_port_in) self.assertEqual(s.out_port, self.tcp_port_out) self.assertEqual(s.ext_port, self.tcp_external_port) # UDP session s = sessions[1] self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) self.assertEqual(s.in_port, self.udp_port_in) self.assertEqual(s.out_port, self.udp_port_out) self.assertEqual(s.ext_port, self.udp_external_port) # ICMP session s = sessions[2] self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) self.assertEqual(s.in_port, self.icmp_id_in) self.assertEqual(s.out_port, self.icmp_external_id) def test_multiple_users(self): """ Deterministic NAT multiple users """ nat_ip = "10.0.0.10" port_in = 80 external_port = 6303 host0 = self.pg0.remote_hosts[0] host1 = self.pg0.remote_hosts[1] self.vapi.det44_add_del_map(is_add=1, in_addr=host0.ip4, in_plen=24, out_addr=socket.inet_aton(nat_ip), out_plen=32) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0) # host0 to out p = (Ether(src=host0.mac, dst=self.pg0.local_mac) / IP(src=host0.ip4, dst=self.pg1.remote_ip4) / TCP(sport=port_in, dport=external_port)) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] self.assertEqual(ip.src, nat_ip) self.assertEqual(ip.dst, self.pg1.remote_ip4) self.assertEqual(tcp.dport, external_port) port_out0 = tcp.sport except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise # host1 to out p = (Ether(src=host1.mac, dst=self.pg0.local_mac) / IP(src=host1.ip4, dst=self.pg1.remote_ip4) / TCP(sport=port_in, dport=external_port)) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] self.assertEqual(ip.src, nat_ip) self.assertEqual(ip.dst, self.pg1.remote_ip4) self.assertEqual(tcp.dport, external_port) port_out1 = tcp.sport except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise dms = self.vapi.det44_map_dump() self.assertEqual(1, len(dms)) self.assertEqual(2, dms[0].ses_num) # out to host0 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=nat_ip) / TCP(sport=external_port, dport=port_out0)) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] self.assertEqual(ip.src, self.pg1.remote_ip4) self.assertEqual(ip.dst, host0.ip4) self.assertEqual(tcp.dport, port_in) self.assertEqual(tcp.sport, external_port) except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise # out to host1 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=nat_ip) / TCP(sport=external_port, dport=port_out1)) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] self.assertEqual(ip.src, self.pg1.remote_ip4) self.assertEqual(ip.dst, host1.ip4) self.assertEqual(tcp.dport, port_in) self.assertEqual(tcp.sport, external_port) except: self.logger.error(ppp("Unexpected or invalid packet", p)) raise # session close api test self.vapi.det44_close_session_out(socket.inet_aton(nat_ip), port_out1, self.pg1.remote_ip4, external_port) dms = self.vapi.det44_map_dump() self.assertEqual(dms[0].ses_num, 1) self.vapi.det44_close_session_in(host0.ip4, port_in, self.pg1.remote_ip4, external_port) dms = self.vapi.det44_map_dump() self.assertEqual(dms[0].ses_num, 0) def test_tcp_session_close_detection_in(self): """ DET44 TCP session close from inside network """ self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, in_plen=32, out_addr=socket.inet_aton(self.nat_addr), out_plen=32) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0) self.initiate_tcp_session(self.pg0, self.pg1) # close the session from inside try: # FIN packet in -> out p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.get_capture(1) pkts = [] # ACK packet out -> in p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")) pkts.append(p) # FIN packet out -> in p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")) pkts.append(p) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg0.get_capture(2) # ACK packet in -> out p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.get_capture(1) # Check if deterministic NAT44 closed the session dms = self.vapi.det44_map_dump() self.assertEqual(0, dms[0].ses_num) except: self.logger.error("TCP session termination failed") raise def test_tcp_session_close_detection_out(self): """ Deterministic NAT TCP session close from outside network """ self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, in_plen=32, out_addr=socket.inet_aton(self.nat_addr), out_plen=32) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0) self.initiate_tcp_session(self.pg0, self.pg1) # close the session from outside try: # FIN packet out -> in p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg0.get_capture(1) pkts = [] # ACK packet in -> out p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")) pkts.append(p) # ACK packet in -> out p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.get_capture(2) # ACK packet out -> in p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg0.get_capture(1) # Check if deterministic NAT44 closed the session dms = self.vapi.det44_map_dump() self.assertEqual(0, dms[0].ses_num) except: self.logger.error("TCP session termination failed") raise def test_session_timeout(self): """ Deterministic NAT session timeouts """ self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, in_plen=32, out_addr=socket.inet_aton(self.nat_addr), out_plen=32) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0) self.initiate_tcp_session(self.pg0, self.pg1) self.vapi.det44_set_timeouts(udp=5, tcp_established=5, tcp_transitory=5, icmp=5) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.get_capture(len(pkts)) self.virtual_sleep(15) dms = self.vapi.det44_map_dump() self.assertEqual(0, dms[0].ses_num) # TODO: ipfix needs to be separated from NAT base plugin @unittest.skipUnless(running_extended_tests, "part of extended tests") def test_session_limit_per_user(self): """ Deterministic NAT maximum sessions per user limit """ self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, in_plen=32, out_addr=socket.inet_aton(self.nat_addr), out_plen=32) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1) self.vapi.det44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0) self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4, src_address=self.pg2.local_ip4, path_mtu=512, template_interval=10) self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, enable=1) pkts = [] for port in range(1025, 2025): p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / UDP(sport=port, dport=port)) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.get_capture(len(pkts)) p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / UDP(sport=3001, dport=3002)) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.assert_nothing_captured() # verify ICMP error packet capture = self.pg0.get_capture(1) p = capture[0] self.assertTrue(p.haslayer(ICMP)) icmp = p[ICMP] self.assertEqual(icmp.type, 3) self.assertEqual(icmp.code, 1) self.assertTrue(icmp.haslayer(IPerror)) inner_ip = icmp[IPerror] self.assertEqual(inner_ip[UDPerror].sport, 3001) self.assertEqual(inner_ip[UDPerror].dport, 3002) dms = self.vapi.det44_map_dump() self.assertEqual(1000, dms[0].ses_num) # verify IPFIX logging self.vapi.ipfix_flush() capture = self.pg2.get_capture(2) ipfix = IPFIXDecoder() # first load template for p in capture: self.assertTrue(p.haslayer(IPFIX)) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) self.verify_ipfix_max_entries_per_user(data, 1000, self.pg0.remote_ip4) self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, enable=0)