summaryrefslogtreecommitdiffstats
path: root/test/test_lisp.py
blob: e0b82087f69495a83acf18e37cd1dce4f58961d6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

@media only all and (prefers-color-scheme: dark) {
.highlight .hll { background-color: #49483e }
.highlight .c { color: #75715e } /* Comment */
.highlight .err { color: #960050; background-color: #1e0010 } /* Error */
.highlight .k { color: #66d9ef } /* Keyword */
.highlight .l { color: #ae81ff } /* Literal */
.highlight .n { color: #f8f8f2 } /* Name */
.highlight .o { color: #f92672 } /* Operator */
.highlight .p { color: #f8f8f2 } /* Punctuation */
.highlight .ch { color: #75715e } /* Comment.Hashbang */
.highlight .cm { color: #75715e } /* Comment.Multiline */
.highlight .cp { color: #75715e } /* Comment.Preproc */
.highlight .cpf { color: #75715e } /* Comment.PreprocFile */
.highlight .c1 { color: #75715e } /* Comment.Single */
.highlight .cs { color: #75715e } /* Comment.Special */
.highlight .gd { color: #f92672 } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gi { color: #a6e22e } /* Generic.Inserted */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #75715e } /* Generic.Subheading */
.highlight .kc { color: #66d9ef } /* Keyword.Constant */
.highlight .kd { color: #66d9ef } /* Keyword.Declaration */
.highlight .kn { color: #f92672 } /* Keyword.Namespace */
.highlight .kp { color: #66d9ef } /* Keyword.Pseudo */
.highlight .kr { color: #66d9ef } /* Keyword.Reserved */
.highlight .kt { color: #66d9ef } /* Keyword.Type */
.highlight .ld { color: #e6db74 } /* Literal.Date */
.highlight .m { color: #ae81ff } /* Literal.Number */
.highlight .s { color: #e6db74 } /* Literal.String */
.highlight .na { color: #a6e22e } /* Name.Attribute */
.highlight .nb { color: #f8f8f2 } /* Name.Builtin */
.highlight .nc { color: #a6e22e } /* Name.Class */
.highlight .no { color: #66d9ef } /* Name.Constant */
.highlight .nd { color: #a6e22e } /* Name.Decorator */
.highlight .ni { color: #f8f8f2 } /* Name.Entity */
.highlight .ne { color: #a6e22e } /* Name.Exception */
.highlight .nf { color: #a6e22e } /* Name.Function */
.highlight .nl { color: #f8f8f2 } /* Name.Label */
.highlight .nn { color: #f8f8f2 } /* Name.Namespace */
.highlight .nx { color: #a6e22e } /* Name.Other */
.highlight .py { color: #f8f8f2 } /* Name.Property */
.highlight .nt { color: #f92672 } /* Name.Tag */
.highlight .nv { color: #f8f8f2 } /* Name.Variable */
.highlight .ow { color: #f92672 } /* Operator.Word */
.highlight .w { c
#!/usr/bin/env python3

import abc
import six
import unittest

from scapy.fields import BitField, ByteField, FlagsField, IntField
from scapy.packet import bind_layers, Packet, Raw
from scapy.layers.inet import IP, UDP, Ether
from scapy.layers.inet6 import IPv6

from framework import VppTestCase, VppTestRunner
from lisp import VppLocalMapping, VppLispAdjacency, VppLispLocator, \
    VppLispLocatorSet, VppRemoteMapping
from util import ppp, ForeignAddressFactory

# From py_lispnetworking.lisp.py:  # GNU General Public License v2.0


class LISP_GPE_Header(Packet):
    name = "LISP GPE Header"
    fields_desc = [
        FlagsField("gpe_flags", None, 6, ["N", "L", "E", "V", "I", "P"]),
        BitField("reserved", 0, 18),
        ByteField("next_proto", 0),
        IntField("iid", 0),
    ]
bind_layers(UDP, LISP_GPE_Header, dport=4341)
bind_layers(UDP, LISP_GPE_Header, sport=4341)
bind_layers(LISP_GPE_Header, IP, next_proto=1)
bind_layers(LISP_GPE_Header, IPv6, next_proto=2)
bind_layers(LISP_GPE_Header, Ether, next_proto=3)


@six.add_metaclass(abc.ABCMeta)
class Driver(object):

    config_order = ['locator-sets',
                    'locators',
                    'local-mappings',
                    'remote-mappings',
                    'adjacencies']

    """ Basic class for data driven testing """
    def __init__(self, test, test_cases):
        self._test_cases = test_cases
        self._test = test

    @property
    def test_cases(self):
        return self._test_cases

    @property
    def test(self):
        return self._test

    def create_packet(self, src_if, dst_if, deid, payload=''):
        """
        Create IPv4 packet

        param: src_if
        param: dst_if
        """
        packet = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
                  IP(src=src_if.remote_ip4, dst=deid) /
                  Raw(payload))
        return packet

    @abc.abstractmethod
    def run(self):
        """ testing procedure """
        pass


class SimpleDriver(Driver):
    """ Implements simple test procedure """
    def __init__(self, test, test_cases):
        super(SimpleDriver, self).__init__(test, test_cases)

    def verify_capture(self, src_loc, dst_loc, capture):
        """
        Verify captured packet

        :param src_loc: source locator address
        :param dst_loc: destination locator address
        :param capture: list of captured packets
        """
        self.test.assertEqual(len(capture), 1, "Unexpected number of "
                              "packets! Expected 1 but {} received"
                              .format(len(capture)))
        packet = capture[0]
        try:
            ip_hdr = packet[IP]
            # assert the values match
            self.test.assertEqual(ip_hdr.src, src_loc, "IP source address")
            self.test.assertEqual(ip_hdr.dst, dst_loc,
                                  "IP destination address")
            gpe_hdr = packet[LISP_GPE_Header]
            self.test.assertEqual(gpe_hdr.next_proto, 1,
                                  "next_proto is not ipv4!")
            ih = gpe_hdr[IP]
            self.test.assertEqual(ih.src, self.test.pg0.remote_ip4,
                                  "unexpected source EID!")
            self.test.assertEqual(ih.dst, self.test.deid_ip4,
                                  "unexpected dest EID!")
        except:
            self.test.logger.error(ppp("Unexpected or invalid packet:",
                                   packet))
            raise

    def configure_tc(self, tc):
        for config_item in self.config_order:
            for vpp_object in tc[config_item]:
                vpp_object.add_vpp_config()

    def run(self, dest):
        """ Send traffic for each test case and verify that it
            is encapsulated """
        for tc in enumerate(self.test_cases):
            self.test.logger.info('Running {}'.format(tc[1]['name']))
            self.configure_tc(tc[1])

            packet = self.create_packet(self.test.pg0, self.test.pg1, dest,
                                        'data')
            self.test.pg0.add_stream(packet)
            self.test.pg0.enable_capture()
            self.test.pg1.enable_capture()
            self.test.pg_start()
            capture = self.test.pg1.get_capture(1)
            self.verify_capture(self.test.pg1.local_ip4,
                                self.test.pg1.remote_ip4, capture)
            self.test.pg0.assert_nothing_captured()


class TestLisp(VppTestCase):
    """ Basic LISP test """

    @classmethod
    def setUpClass(cls):
        super(TestLisp, cls).setUpClass()
        cls.faf = ForeignAddressFactory()
        cls.create_pg_interfaces(range(2))  # create pg0 and pg1
        for i in cls.pg_interfaces:
            i.admin_up()  # put the interface upsrc_if
            i.config_ip4()  # configure IPv4 address on the interface
            i.resolve_arp()  # resolve ARP, so that we know VPP MAC

    @classmethod
    def tearDownClass(cls):
        super(TestLisp, cls).tearDownClass()

    def setUp(self):
        super(TestLisp, self).setUp()
        self.vapi.lisp_enable_disable(is_enabled=1)

    def test_lisp_basic_encap(self):
        """Test case for basic encapsulation"""

        self.deid_ip4_net = self.faf.net
        self.deid_ip4 = self.faf.get_ip4()
        self.seid_ip4 = '{!s}/{!s}'.format(self.pg0.local_ip4, 32)
        self.rloc_ip4 = self.pg1.remote_ip4n

        test_cases = [
            {
                'name': 'basic ip4 over ip4',
                'locator-sets': [VppLispLocatorSet(self, b'ls-4o4')],
                'locators': [
                    VppLispLocator(self, self.pg1.sw_if_index, b'ls-4o4')
                ],
                'local-mappings': [
                    VppLocalMapping(self, self.seid_ip4, b'ls-4o4')
                ],
                'remote-mappings': [
                    VppRemoteMapping(self, self.deid_ip4_net,
                                     [{
                                         "is_ip4": 1,
                                         "priority": 1,
                                         "weight": 1,
                                         "addr": self.rloc_ip4
                                     }])
                ],
                'adjacencies': [
                    VppLispAdjacency(self, self.seid_ip4, self.deid_ip4_net)
                ]
            }
        ]
        self.test_driver = SimpleDriver(self, test_cases)
        self.test_driver.run(self.deid_ip4)


if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)