aboutsummaryrefslogtreecommitdiffstats
path: root/test/vpp_udp_decap.py
blob: 2bd03ce75c0c03e8d8e80af35305742fa3e36666 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python3
"""
  UDP decap objects
"""

from vpp_object import VppObject
from socket import inet_pton, inet_ntop, AF_INET, AF_INET6


class VppUdpDecap(VppObject):

    def __init__(self,
                 test,
                 is_ip4,
                 dst_port,
                 next_proto):
        self._test = test
        self.active = False
        self.udp_decap = {
            'is_ip4': is_ip4,
            'port': dst_port,
            'next_proto': next_proto
        }

    def add_vpp_config(self):
        self._test.vapi.udp_decap_add_del(True, self.udp_decap)
        self._test.registry.register(self, self._test.logger)
        self.active = True

    def query_vpp_config(self):
        return self.active

    def remove_vpp_config(self):
        self._test.vapi.udp_decap_add_del(False, self.udp_decap)
        self.active = False
} /* Name.Function.Magic */ .highlight .vc { color: #f8f8f2 } /* Name.Variable.Class */ .highlight .vg { color: #f8f8f2 } /* Name.Variable.Global */ .highlight .vi { color: #f8f8f2 } /* Name.Variable.Instance */ .highlight .vm { color: #f8f8f2 } /* Name.Variable.Magic */ .highlight .il { color: #ae81ff } /* Literal.Number.Integer.Long */ } @media (prefers-color-scheme: light) { .highlight .hll { background-color: #ffffcc } .highlight .c { color: #888888 } /* Comment */ .highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */ .highlight .k { color: #008800; font-weight: bold } /* Keyword */ .highlight .ch { color: #888888 } /* Comment.Hashbang */ .highlight .cm { color: #888888 } /* Comment.Multiline */ .highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */ .highlight .cpf { color: #888888 } /* Comment.PreprocFile */ .highlight .c1 { color: #888888 } /* Comment.Single */ .highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */ .highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */ .highlight .ge { font-style: italic } /* Generic.Emph */ .highlight .gr { color: #aa0000 } /* Generic.Error */ .highlight .gh { color: #333333 } /* Generic.Heading */ .highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */ .highlight .go { color: #888888 } /* Generic.Output */ .highlight .gp { color: #555555 } /* Generic.Prompt */ .highlight .gs { font-weight: bold } /* Generic.Strong */ .highlight .gu { color: #666666 } /* Generic.Subheading */ .highlight .gt { color: #aa0000 } /* Generic.Traceback */ .highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */ .highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */ .highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */ .highlight .kp { color: #008800 } /* Keyword.Pseudo */ .highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */ .highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */ .highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */ .highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */ .highlight .na { color: #336699 } /* Name.Attribute */ .highlight .nb { color: #003388 } /* Name.Builtin */ .highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */ .highlight .no { color: #003366; font-weight: bold } /* Name.Constant */ .highlight .nd { color: #555555 } /* Name.Decorator */ .highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */ .highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */ .highlight .nl { color: #336699; font-style: italic } /* Name.Label */ .highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */ .highlight .py { color: #336699; font-weight: bold } /* Name.Property */ .highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */ .highlight .nv { color: #336699 } /* Name.Variable */ .highlight .ow { color: #008800 } /* Operator.Word */ .highlight .w { color: #bbbbbb } /* Text.Whitespace */ .highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */ .highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */ .highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */ .highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */ .highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */ .highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */ .highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */ .highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */ .highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */ .highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */ .highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */ .highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */ .highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */ .highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */ .highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */ .highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */ .highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */ .highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */ .highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */ .highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */ .highlight .vc { color: #336699 } /* Name.Variable.Class */ .highlight .vg { color: #dd7700 } /* Name.Variable.Global */ .highlight .vi { color: #3333bb } /* Name.Variable.Instance */ .highlight .vm { color: #336699 } /* Name.Variable.Magic */ .highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */ }
#!/usr/bin/env python3

import socket
import unittest
import struct
import random

from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner, running_extended_tests

import scapy.compat
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
    ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
from scapy.layers.l2 import Ether, ARP, GRE
from scapy.data import IP_PROTOS
from scapy.packet import bind_layers, Raw
from util import ppp
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from time import sleep
from util import ip4_range
from vpp_papi import mac_pton
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
from io import BytesIO
from vpp_papi import VppEnum
from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
from vpp_neighbor import VppNeighbor
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
    IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
    PacketListField
from ipaddress import IPv6Network


@tag_fixme_vpp_workers
class TestDSlite(VppTestCase):
    """ DS-Lite Test Cases """

    @classmethod
    def setUpClass(cls):
        super(TestDSlite, cls).setUpClass()

        try:
            cls.nat_addr = '10.0.0.3'

            cls.create_pg_interfaces(range(3))
            cls.pg0.admin_up()
            cls.pg0.config_ip4()
            cls.pg0.resolve_arp()
            cls.pg1.admin_up()
            cls.pg1.config_ip6()
            cls.pg1.generate_remote_hosts(2)
            cls.pg1.configure_ipv6_neighbors()
            cls.pg2.admin_up()
            cls.pg2.config_ip4()
            cls.pg2.resolve_arp()

        except Exception:
            super(TestDSlite, cls).tearDownClass()
            raise

    @classmethod
    def tearDownClass(cls):
        super(TestDSlite, cls).tearDownClass()

    def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
                             sv6enc, proto):
        message = data.decode('utf-8')
        try:
            message = SyslogMessage.parse(message)
        except ParseError as e:
            self.logger.error(e)
        else:
            self.assertEqual(message.severity, SyslogSeverity.info)
            self.assertEqual(message.appname, 'NAT')
            self.assertEqual(message.msgid, 'APMADD')
            sd_params = message.sd.get('napmap')
            self.assertTrue(sd_params is not None)
            self.assertEqual(sd_params.get('IATYP'), 'IPv4')
            self.assertEqual(sd_params.get('ISADDR'), isaddr)
            self.assertEqual(sd_params.get('ISPORT'), "%d" % isport)
            self.assertEqual(sd_params.get('XATYP'), 'IPv4')
            self.assertEqual(sd_params.get('XSADDR'), xsaddr)
            self.assertEqual(sd_params.get('XSPORT'), "%d" % xsport)
            self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
            self.assertTrue(sd_params.get('SSUBIX') is not None)
            self.assertEqual(sd_params.get('SV6ENC'), sv6enc)

    def test_dslite(self):
        """ Test DS-Lite """
        nat_config = self.vapi.nat_show_config()
        self.assertEqual(0, nat_config.dslite_ce)

        self.vapi.dslite_add_del_pool_addr_range(start_addr=self.nat_addr,
                                                 end_addr=self.nat_addr,
                                                 is_add=1)
        aftr_ip4 = '192.0.0.1'
        aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
        self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
        self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)

        # UDP
        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
             IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
             UDP(sport=20000, dport=10000))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)
        capture = capture[0]
        self.assertFalse(capture.haslayer(IPv6))
        self.assertEqual(capture[IP].src, self.nat_addr)
        self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
        self.assertNotEqual(capture[UDP].sport, 20000)
        self.assertEqual(capture[UDP].dport, 10000)
        self.assert_packet_checksums_valid(capture)
        out_port = capture[UDP].sport
        capture = self.pg2.get_capture(1)
        self.verify_syslog_apmadd(capture[0][Raw].load, '192.168.1.1',
                                  20000, self.nat_addr, out_port,
                                  self.pg1.remote_hosts[0].ip6, IP_PROTOS.udp)

        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
             UDP(sport=10000, dport=out_port))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        capture = capture[0]
        self.assertEqual(capture[IPv6].src, aftr_ip6)
        self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
        self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
        self.assertEqual(capture[IP].dst, '192.168.1.1')
        self.assertEqual(capture[UDP].sport, 10000)
        self.assertEqual(capture[UDP].dport, 20000)
        self.assert_packet_checksums_valid(capture)

        # TCP
        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
             IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
             TCP(sport=20001, dport=10001))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)
        capture = capture[0]
        self.assertFalse(capture.haslayer(IPv6))
        self.assertEqual(capture[IP].src, self.nat_addr)
        self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
        self.assertNotEqual(capture[TCP].sport, 20001)
        self.assertEqual(capture[TCP].dport, 10001)
        self.assert_packet_checksums_valid(capture)
        out_port = capture[TCP].sport

        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
             TCP(sport=10001, dport=out_port))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        capture = capture[0]
        self.assertEqual(capture[IPv6].src, aftr_ip6)
        self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
        self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
        self.assertEqual(capture[IP].dst, '192.168.1.1')
        self.assertEqual(capture[TCP].sport, 10001)
        self.assertEqual(capture[TCP].dport, 20001)
        self.assert_packet_checksums_valid(capture)

        # ICMP
        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
             IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
             ICMP(id=4000, type='echo-request'))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)
        capture = capture[0]
        self.assertFalse(capture.haslayer(IPv6))
        self.assertEqual(capture[IP].src, self.nat_addr)
        self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
        self.assertNotEqual(capture[ICMP].id, 4000)
        self.assert_packet_checksums_valid(capture)
        out_id = capture[ICMP].id

        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
             ICMP(id=out_id, type='echo-reply'))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        capture = capture[0]
        self.assertEqual(capture[IPv6].src, aftr_ip6)
        self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
        self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
        self.assertEqual(capture[IP].dst, '192.168.1.1')
        self.assertEqual(capture[ICMP].id, 4000)
        self.assert_packet_checksums_valid(capture)

        # ping DS-Lite AFTR tunnel endpoint address
        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
             ICMPv6EchoRequest())
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        capture = capture[0]
        self.assertEqual(capture[IPv6].src, aftr_ip6)
        self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
        self.assertTrue(capture.haslayer(ICMPv6EchoReply))

        b4s = self.statistics.get_counter('/dslite/total-b4s')
        self.assertEqual(b4s[0][0], 2)
        sessions = self.statistics.get_counter('/dslite/total-sessions')
        self.assertEqual(sessions[0][0], 3)

    def tearDown(self):
        super(TestDSlite, self).tearDown()

    def show_commands_at_teardown(self):
        self.logger.info(self.vapi.cli("show dslite pool"))
        self.logger.info(
            self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
        self.logger.info(self.vapi.cli("show dslite sessions"))


class TestDSliteCE(VppTestCase):
    """ DS-Lite CE Test Cases """

    @classmethod
    def setUpConstants(cls):
        super(TestDSliteCE, cls).setUpConstants()
        cls.vpp_cmdline.extend(["dslite", "{", "ce", "}"])

    @classmethod
    def setUpClass(cls):
        super(TestDSliteCE, cls).setUpClass()

        try:
            cls.create_pg_interfaces(range(2))
            cls.pg0.admin_up()
            cls.pg0.config_ip4()
            cls.pg0.resolve_arp()
            cls.pg1.admin_up()
            cls.pg1.config_ip6()
            cls.pg1.generate_remote_hosts(1)
            cls.pg1.configure_ipv6_neighbors()

        except Exception:
            super(TestDSliteCE, cls).tearDownClass()
            raise

    @classmethod
    def tearDownClass(cls):
        super(TestDSliteCE, cls).tearDownClass()

    def test_dslite_ce(self):
        """ Test DS-Lite CE """

        # TODO: add message to retrieve dslite config
        # nat_config = self.vapi.nat_show_config()
        # self.assertEqual(1, nat_config.dslite_ce)

        b4_ip4 = '192.0.0.2'
        b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
        self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)

        aftr_ip4 = '192.0.0.1'
        aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
        aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
        self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)

        r1 = VppIpRoute(self, aftr_ip6, 128,
                        [VppRoutePath(self.pg1.remote_ip6,
                                      self.pg1.sw_if_index)])
        r1.add_vpp_config()

        # UDP encapsulation
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
             UDP(sport=10000, dport=20000))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        capture = capture[0]
        self.assertEqual(capture[IPv6].src, b4_ip6)
        self.assertEqual(capture[IPv6].dst, aftr_ip6)
        self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
        self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
        self.assertEqual(capture[UDP].sport, 10000)
        self.assertEqual(capture[UDP].dport, 20000)
        self.assert_packet_checksums_valid(capture)

        # UDP decapsulation
        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IPv6(dst=b4_ip6, src=aftr_ip6) /
             IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
             UDP(sport=20000, dport=10000))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)
        capture = capture[0]
        self.assertFalse(capture.haslayer(IPv6))
        self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
        self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
        self.assertEqual(capture[UDP].sport, 20000)
        self.assertEqual(capture[UDP].dport, 10000)
        self.assert_packet_checksums_valid(capture)

        # ping DS-Lite B4 tunnel endpoint address
        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
             ICMPv6EchoRequest())
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        capture = capture[0]
        self.assertEqual(capture[IPv6].src, b4_ip6)
        self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
        self.assertTrue(capture.haslayer(ICMPv6EchoReply))

    def tearDown(self):
        super(TestDSliteCE, self).tearDown()

    def show_commands_at_teardown(self):
        self.logger.info(
            self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
        self.logger.info(
            self.vapi.cli("show dslite b4-tunnel-endpoint-address"))