summaryrefslogtreecommitdiffstats
path: root/test/asf/test_prom.py
blob: f536fd19d34835f4fd9686c32d74d35a57f066c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from config import config
from asfframework import VppAsfTestCase, VppTestRunner
import unittest
import subprocess
from vpp_qemu_utils import (
    create_host_interface,
    delete_host_interfaces,
    create_namespace,
    delete_namespace,
)


@unittest.skipIf(
    "http_static" in config.excluded_plugins, "Exclude HTTP Static Server plugin tests"
)
@unittest.skipIf("prom" in config.excluded_plugins, "Exclude Prometheus plugin tests")
@unittest.skipIf(config.skip_netns_tests, "netns not available or disabled from cli")
class TestProm(VppAsfTestCase):
    """Prometheus plugin test"""

    @classmethod
    def setUpClass(cls):
        super(TestProm, cls).setUpClass()

        create_namespace("HttpStaticProm")
        create_host_interface("vppHost", "vppOut", "HttpStaticProm", "10.10.1.1/24")

        cls.vapi.cli("create host-interface name vppOut")
        cls.vapi.cli("set int state host-vppOut up")
        cls.vapi.cli("set int ip address host-vppOut 10.10.1.2/24")

    @classmethod
    def tearDownClass(cls):
        delete_namespace(["HttpStaticProm"])
        delete_host_interfaces("vppHost")
        super(TestProm, cls).tearDownClass()

    def test_prom(self):
        """Enable HTTP Static server and prometheus exporter, get stats"""
        self.vapi.cli("http static server uri tcp://0.0.0.0/80 url-handlers")
        self.vapi.cli("prom enable")
        self.sleep(1, "wait for min-scrape-interval to expire")

        process = subprocess.run(
            [
                "ip",
                "netns",
                "exec",
                "HttpStaticProm",
                "curl",
                f"10.10.1.2/stats.prom",
            ],
            capture_output=True,
        )
        self.assertIn(b"TYPE", process.stdout)


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)