summaryrefslogtreecommitdiffstats
path: root/test/asf/test_vapi.py
blob: a5635ff356788ee62eb9505071eb537dd9ad5ae4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python3
""" VAPI test """

import unittest
import os
import signal
from config import config
from asfframework import VppAsfTestCase, VppTestRunner, Worker


class VAPITestCase(VppAsfTestCase):
    """VAPI test"""

    @classmethod
    def setUpClass(cls):
        super(VAPITestCase, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(VAPITestCase, cls).tearDownClass()

    def run_vapi_c(self, path, transport):
        executable = f"{config.vpp_build_dir}/vpp/bin/vapi_c_test"
        worker = Worker([executable, "vapi client", path, transport], self.logger)
        worker.start()
        timeout = 60
        worker.join(timeout)
        self.logger.info("Worker result is `%s'" % worker.result)
        error = False
        if worker.result is None:
            try:
                error = True
                self.logger.error("Timeout! Worker did not finish in %ss" % timeout)
                os.killpg(os.getpgid(worker.process.pid), signal.SIGTERM)
                worker.join()
            except:
                self.logger.debug("Couldn't kill worker-spawned process")
                raise
        if error:
            raise Exception("Timeout! Worker did not finish in %ss" % timeout)
        self.assert_equal(worker.result, 0, "Binary test return code")

    def test_vapi_c_shm(self):
        """run C VAPI tests (over shared memory)"""
        self.run_vapi_c(self.get_api_segment_prefix(), "shm")

    def test_vapi_c_uds(self):
        """run C VAPI tests (over unix domain socket)"""
        self.run_vapi_c(self.get_api_sock_path(), "uds")

    def run_vapi_cpp(self, path, transport):
        """run C++ VAPI tests"""
        executable = f"{config.vpp_build_dir}/vpp/bin/vapi_cpp_test"
        worker = Worker([executable, "vapi client", path, transport], self.logger)
        worker.start()
        timeout = 120
        worker.join(timeout)
        self.logger.info("Worker result is `%s'" % worker.result)
        error = False
        if worker.result is None:
            try:
                error = True
                self.logger.error("Timeout! Worker did not finish in %ss" % timeout)
                os.killpg(os.getpgid(worker.process.pid), signal.SIGTERM)
                worker.join()
            except:
                raise Exception("Couldn't kill worker-spawned process")
        if error:
            raise Exception("Timeout! Worker did not finish in %ss" % timeout)
        self.assert_equal(worker.result, 0, "Binary test return code")

    def test_vapi_cpp_shm(self):
        """run C++ VAPI tests (over shared memory)"""
        self.run_vapi_cpp(self.get_api_segment_prefix(), "shm")

    def test_vapi_cpp_uds(self):
        """run C++ VAPI tests (over unix domain socket)"""
        self.run_vapi_cpp(self.get_api_sock_path(), "uds")


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)