summaryrefslogtreecommitdiffstats
path: root/test/test_dns.py
blob: acc9bfe889cd0bafb5c4966b50cd2fb7e7c13847 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/env python3

import unittest

from framework import VppTestCase, VppTestRunner
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
from ipaddress import *

import scapy.compat
from scapy.contrib.mpls import MPLS
from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.layers.dns import DNSRR, DNS, DNSQR


class TestDns(VppTestCase):
    """Dns Test Cases"""

    @classmethod
    def setUpClass(cls):
        super(TestDns, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestDns, cls).tearDownClass()

    def setUp(self):
        super(TestDns, self).setUp()

        self.create_pg_interfaces(range(1))

        for i in self.pg_interfaces:
            i.admin_up()
            i.config_ip4()
            i.resolve_arp()

    def tearDown(self):
        super(TestDns, self).tearDown()

    def create_stream(self, src_if):
        """Create input packet stream for defined interface.

        :param VppInterface src_if: Interface to create packet stream for.
        """
        good_request = (
            Ether(dst=src_if.local_mac, src=src_if.remote_mac)
            / IP(src=src_if.remote_ip4)
            / UDP(sport=1234, dport=53)
            / DNS(rd=1, qd=DNSQR(qname="bozo.clown.org"))
        )

        bad_request = (
            Ether(dst=src_if.local_mac, src=src_if.remote_mac)
            / IP(src=src_if.remote_ip4)
            / UDP(sport=1234, dport=53)
            / DNS(rd=1, qd=DNSQR(qname="no.clown.org"))
        )
        pkts = [good_request, bad_request]
        return pkts

    def verify_capture(self, dst_if, capture):
        """Verify captured input packet stream for defined interface.

        :param VppInterface dst_if: Interface to verify captured packet stream
            for.
        :param list capture: Captured packet stream.
        """
        self.logger.info("Verifying capture on interface %s" % dst_if.name)
        for packet in capture:
            dns = packet[DNS]
            self.assertEqual(dns.an[0].rdata, "1.2.3.4")

    def test_dns_unittest(self):
        """DNS Name Resolver Basic Functional Test"""

        # Set up an upstream name resolver. We won't actually go there
        self.vapi.dns_name_server_add_del(
            is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
        )

        # Enable name resolution
        self.vapi.dns_enable_disable(enable=1)

        # Manually add a static dns cache entry
        self.logger.info(self.vapi.cli("dns cache add bozo.clown.org 1.2.3.4"))

        # Test the binary API
        rv = self.vapi.dns_resolve_name(name=b"bozo.clown.org")
        self.assertEqual(rv.ip4_address, IPv4Address("1.2.3.4").packed)

        # Configure 127.0.0.1/8 on the pg interface
        self.vapi.sw_interface_add_del_address(
            sw_if_index=self.pg0.sw_if_index, prefix="127.0.0.1/8"
        )

        # Send a couple of DNS request packets, one for bozo.clown.org
        # and one for no.clown.org which won't resolve

        pkts = self.create_stream(self.pg0)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)

        self.pg_start()
        pkts = self.pg0.get_capture(1)
        self.verify_capture(self.pg0, pkts)

        # Make sure that the cache contents are correct
        str = self.vapi.cli("show dns cache verbose")
        self.assertIn("1.2.3.4", str)
        self.assertIn("[P] no.clown.org:", str)


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)
): return self._sw_if_index @property def priority(self): return self._priority @property def weight(self): return self._weight def add_vpp_config(self): self.test.vapi.lisp_add_del_locator( locator_set_name=self._ls_name, sw_if_index=self._sw_if_index, priority=self._priority, weight=self._weight, ) self._test.registry.register(self, self.test.logger) def get_lisp_locator_dump_entry(self): locators = self.test.vapi.lisp_locator_dump( is_index_set=0, ls_name=self._ls_name ) for locator in locators: if locator.sw_if_index == self._sw_if_index: return locator return None def query_vpp_config(self): locator = self.get_lisp_locator_dump_entry() return locator is not None def remove_vpp_config(self): self.test.vapi.lisp_add_del_locator( locator_set_name=self._ls_name, sw_if_index=self._sw_if_index, priority=self._priority, weight=self._weight, is_add=0, ) self._test.registry.register(self, self.test.logger) def object_id(self): return "lisp-locator-%s-%d" % (self._ls_name, self._sw_if_index) class LispEIDType: PREFIX = 0 MAC = 1 NSH = 2 class LispKeyIdType: NONE = 0 SHA1 = 1 SHA256 = 2 class LispEID: """Lisp endpoint identifier""" def __init__(self, eid): self.eid = eid self._type = -1 # find out whether EID is ip prefix, or MAC try: self.prefix = ip_network(self.eid) self._type = LispEIDType.PREFIX return except ValueError: if self.eid.count(":") == 5: # MAC address self.mac = self.eid self._type = LispEIDType.MAC return raise Exception("Unsupported EID format {!s}!".format(eid)) @property def eid_type(self): return self._type @property def address(self): if self.eid_type == LispEIDType.PREFIX: return self.prefix elif self.eid_type == LispEIDType.MAC: return self.mac elif self.eid_type == LispEIDType.NSH: return Exception("Unimplemented") @property def packed(self): if self.eid_type == LispEIDType.PREFIX: return {"type": self._type, "address": {"prefix": self.prefix}} elif self.eid_type == LispEIDType.MAC: return {"type": self._type, "address": {"mac": self.mac}} elif self.eid_type == LispEIDType.NSH: return Exception("Unimplemented") class LispKey: """Lisp Key""" def __init__(self, key_type, key): self._key_type = key_type self._key = key @property def packed(self): return {"id": self._key_type, "key": self._key} class VppLispMapping(VppObject): """Represents common features for remote and local LISP mapping in VPP""" def __init__(self, test, eid, vni=0, priority=1, weight=1): self._eid = LispEID(eid) self._test = test self._priority = priority self._weight = weight self._vni = vni @property def test(self): return self._test @property def vni(self): return self._vni @property def eid(self): return self._eid @property def priority(self): return self._priority @property def weight(self): return self._weight def get_lisp_mapping_dump_entry(self): return self.test.vapi.lisp_eid_table_dump( eid_set=1, vni=self._vni, eid=self._eid.packed ) def query_vpp_config(self): mapping = self.get_lisp_mapping_dump_entry() return mapping def object_id(self): return "lisp-mapping-[%s]-%s-%s-%s" % ( self.vni, self.eid.address, self.priority, self.weight, ) class VppLocalMapping(VppLispMapping): """LISP Local mapping""" def __init__( self, test, eid, ls_name, vni=0, priority=1, weight=1, key_id=LispKeyIdType.NONE, key="", ): super(VppLocalMapping, self).__init__(test, eid, vni, priority, weight) self._ls_name = ls_name self._key = LispKey(key_id, key) @property def ls_name(self): return self._ls_name @property def key_id(self): return self._key_id @property def key(self): return self._key def add_vpp_config(self): self.test.vapi.lisp_add_del_local_eid( locator_set_name=self._ls_name, eid=self._eid.packed, vni=self._vni, key=self._key.packed, ) self._test.registry.register(self, self.test.logger) def remove_vpp_config(self): self.test.vapi.lisp_add_del_local_eid( locator_set_name=self._ls_name, eid=self._eid.packed, vni=self._vni, is_add=0, ) def object_id(self): return "lisp-eid-local-mapping-%s[%d]" % (self._eid.address, self._vni) class LispRemoteLocator: def __init__(self, addr, priority=1, weight=1): self.addr = addr self.priority = priority self.weight = weight @property def packed(self): return { "priority": self.priority, "weight": self.weight, "ip_address": self.addr, } class VppRemoteMapping(VppLispMapping): def __init__(self, test, eid, rlocs=None, vni=0, priority=1, weight=1): super(VppRemoteMapping, self).__init__(test, eid, vni, priority, weight) self._rlocs = rlocs @property def rlocs(self): rlocs = [] for rloc in self._rlocs: rlocs.append(rloc.packed) return rlocs def add_vpp_config(self): self.test.vapi.lisp_add_del_remote_mapping( rlocs=self.rlocs, deid=self._eid.packed, vni=self._vni, rloc_num=len(self._rlocs), ) self._test.registry.register(self, self.test.logger) def remove_vpp_config(self): self.test.vapi.lisp_add_del_remote_mapping( deid=self._eid.packed, vni=self._vni, is_add=0, rloc_num=0 ) def object_id(self): return "lisp-eid-remote-mapping-%s[%d]" % (self._eid.address, self._vni) class VppLispAdjacency(VppObject): """Represents LISP adjacency in VPP""" def __init__(self, test, leid, reid, vni=0): self._leid = LispEID(leid) self._reid = LispEID(reid) if self._leid.eid_type != self._reid.eid_type: raise Exception("remote and local EID are different types!") self._vni = vni self._test = test @property def test(self): return self._test @property def leid(self): return self._leid @property def reid(self): return self._reid @property def vni(self): return self._vni def add_vpp_config(self): self.test.vapi.lisp_add_del_adjacency( leid=self._leid.packed, reid=self._reid.packed, vni=self._vni ) self._test.registry.register(self, self.test.logger) @staticmethod def eid_equal(eid, eid_api): if eid.eid_type != eid_api.type: return False if eid_api.type == LispEIDType.PREFIX: if eid.address.prefixlen != eid_api.address.prefix.prefixlen: return False if eid.address != eid_api.address: return False return True def query_vpp_config(self): res = self.test.vapi.lisp_adjacencies_get(vni=self._vni) for adj in res.adjacencies: if self.eid_equal(self._leid, adj.leid) and self.eid_equal( self._reid, adj.reid ): return True return False def remove_vpp_config(self): self.test.vapi.lisp_add_del_adjacency( leid=self._leid.packed, reid=self._reid.packed, vni=self._vni, is_add=0 ) def object_id(self): return "lisp-adjacency-%s-%s[%d]" % (self._leid, self._reid, self._vni)