summaryrefslogtreecommitdiffstats
path: root/src/vppinfra/tw_timer_4t_3w_256sl.c
AgeCommit message (Expand)AuthorFilesLines
2017-06-12three-level timer wheel implementation w/ overflow vectorDave Barach1-0/+26
='n48' href='#n48'>48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
#!/usr/bin/env python
""" CDP tests """

from scapy.packet import Packet
from scapy.all import ShortField, StrField
from scapy.layers.l2 import Dot3, LLC, SNAP
from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \
        CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR

from framework import VppTestCase
from scapy.all import raw
from re import compile
from time import sleep
from util import ppp
import platform
import sys
import unittest


""" TestCDP is a subclass of  VPPTestCase classes.

CDP test.

"""


class CustomTLV(Packet):
    """ Custom TLV protocol layer for scapy """

    fields_desc = [
        ShortField("type", 0),
        ShortField("length", 4),
        StrField("value", "")

    ]


class TestCDP(VppTestCase):
    """ CDP Test Case """

    nen_ptr = compile(r"not enabled")
    cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
    err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")

    @property
    def device_id(self):
        return platform.node()

    @property
    def version(self):
        return platform.release()

    @property
    def port_id(self):
        return self.interface.name

    @property
    def platform(self):
        return platform.system()

    @classmethod
    def setUpClass(cls):
        super(TestCDP, cls).setUpClass()
        try:
            cls.create_pg_interfaces(range(1))
            cls.interface = cls.pg_interfaces[0]

            cls.interface.admin_up()
            cls.interface.config_ip4()
            cls.interface.resolve_arp()

        except Exception:
            super(TestCDP, cls).tearDownClass()
            raise

    @classmethod
    def tearDownClass(cls):
        super(TestCDP, cls).tearDownClass()

    def test_enable_cdp(self):
        self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
        ret = self.vapi.cli("show cdp")
        self.logger.info(ret)
        not_enabled = self.nen_ptr.search(ret)
        self.assertFalse(not_enabled, "CDP isn't enabled")

    def test_send_cdp_packet(self):
        self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
        self.send_packet(self.create_packet())

        neighbors = list(self.show_cdp())
        self.assertTrue(neighbors, "CDP didn't register neighbor")

        port, system = neighbors[0]
        length = min(len(system), len(self.device_id))

        self.assert_equal(port, self.port_id, "CDP received invalid port id")
        self.assert_equal(system[:length], self.device_id[:length],
                          "CDP received invalid device id")

    @unittest.skipIf(sys.version_info[0] > 2,
                     "not supported in python3/scapy")
    def test_cdp_underflow_tlv(self):
        self.send_bad_packet(3, ".")

    @unittest.skipIf(sys.version_info[0] > 2,
                     "not supported in python3/scapy")
    def test_cdp_overflow_tlv(self):
        self.send_bad_packet(8, ".")

    def send_bad_packet(self, l, v):
        self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
        self.send_packet(self.create_bad_packet(l, v))

        errors = list(self.show_errors())
        self.assertTrue(errors)

        expected_errors = False
        for count, node, reason in errors:
            if (node == u'cdp-input' and
                    reason == u'cdp packets with bad TLVs' and
                    int(count) >= 1):

                expected_errors = True
                break
        self.assertTrue(expected_errors, "CDP didn't drop bad packet")

    def send_packet(self, packet):
        self.logger.debug(ppp("Sending packet:", packet))
        self.interface.add_stream(packet)
        self.pg_start()

    def create_base_packet(self):
        packet = (Dot3(src=self.interface.remote_mac,
                       dst="01:00:0c:cc:cc:cc") /
                  LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) /
                  SNAP()/CDPv2_HDR())
        return packet

    def create_packet(self):
        packet = (self.create_base_packet() /
                  CDPMsgDeviceID(val=self.device_id) /
                  CDPMsgSoftwareVersion(val=self.version) /
                  CDPMsgPortID(iface=self.port_id) /
                  CDPMsgPlatform(val=self.platform))
        return packet

    def create_bad_packet(self, tl=4, tv=""):
        packet = (self.create_base_packet() /
                  CustomTLV(type=1,
                            length=tl,
                            value=tv))
        return packet

    def process_cli(self, exp, ptr):
        for line in self.vapi.cli(exp).split('\n')[1:]:
            m = ptr.match(line.strip())
            if m:
                yield m.groups()

    def show_cdp(self):
        for pack in self.process_cli("show cdp", self.cdp_ptr):
            try:
                port, system, _, _ = pack
            except ValueError:
                pass
            else:
                yield port, system

    def show_errors(self):
        for pack in self.process_cli("show errors", self.err_ptr):
            try:
                count, node, reason = pack
            except ValueError:
                pass
            else:
                yield count, node, reason