summaryrefslogtreecommitdiffstats
path: root/test/test_stats_client.py
blob: dcbf0fedbc0bc271427b165fb9af74abe095e285 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python2.7

import unittest
import time
import psutil
from vpp_papi.vpp_stats import VPPStats

from framework import VppTestCase, VppTestRunner


class StatsClientTestCase(VppTestCase):
    """Test Stats Client"""

    @classmethod
    def setUpClass(cls):
        super(StatsClientTestCase, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(StatsClientTestCase, cls).tearDownClass()

    def test_set_errors(self):
        """Test set errors"""
        self.assertEqual(self.statistics.set_errors(), {})
        self.assertEqual(self.statistics.get_counter('/err/ethernet-input/no'),
                         [0])

    def test_client_fd_leak(self):
        """Test file descriptor count - VPP-1486"""

        cls = self.__class__
        p = psutil.Process()
        initial_fds = p.num_fds()

        for _ in range(100):
            stats = VPPStats(socketname=cls.stats_sock)
            stats.disconnect()

        ending_fds = p.num_fds()
        self.assertEqual(initial_fds, ending_fds,
                         "initial client side file descriptor count: %s "
                         "is not equal to "
                         "ending client side file descriptor count: %s" % (
                             initial_fds, ending_fds))

    @unittest.skip("Manual only")
    def test_mem_leak(self):
        def loop():
            print('Running loop')
            for i in range(50):
                rv = self.vapi.papi.tap_create_v2(id=i, use_random_mac=1)
                self.assertEqual(rv.retval, 0)
                rv = self.vapi.papi.tap_delete_v2(sw_if_index=rv.sw_if_index)
                self.assertEqual(rv.retval, 0)

        before = self.statistics.get_counter('/mem/statseg/used')
        loop()
        self.vapi.cli("memory-trace on stats-segment")
        for j in range(100):
            loop()
        print(self.vapi.cli("show memory stats-segment verbose"))
        print('AFTER', before,
              self.statistics.get_counter('/mem/statseg/used'))


if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)
>.vapi.ipsec_select_backend(self.vpp_ah_protocol, 200) with self.vapi.assert_negative_api_retval(): self.vapi.ipsec_select_backend(self.vpp_esp_protocol, 200) def test_select_backend_in_use(self): """ attempt to change backend while sad configured """ params = self.ipv4_params addr_type = params.addr_type is_ipv6 = params.is_ipv6 scapy_tun_sa_id = params.scapy_tun_sa_id scapy_tun_spi = params.scapy_tun_spi auth_algo_vpp_id = params.auth_algo_vpp_id auth_key = params.auth_key crypt_algo_vpp_id = params.crypt_algo_vpp_id crypt_key = params.crypt_key self.vapi.ipsec_sad_entry_add_del(scapy_tun_sa_id, scapy_tun_spi, auth_algo_vpp_id, auth_key, crypt_algo_vpp_id, crypt_key, self.vpp_ah_protocol, self.pg0.local_addr[addr_type], self.pg0.remote_addr[addr_type]) with self.vapi.assert_negative_api_retval(): self.vapi.ipsec_select_backend( protocol=self.vpp_ah_protocol, index=0) self.vapi.ipsec_sad_entry_add_del(scapy_tun_sa_id, scapy_tun_spi, auth_algo_vpp_id, auth_key, crypt_algo_vpp_id, crypt_key, self.vpp_ah_protocol, self.pg0.local_addr[addr_type], self.pg0.remote_addr[addr_type], is_add=0) self.vapi.ipsec_select_backend( protocol=self.vpp_ah_protocol, index=0) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)