summaryrefslogtreecommitdiffstats
path: root/test/test_reassembly.py
AgeCommit message (Expand)AuthorFilesLines
2019-06-18fib: fib api updatesNeale Ranns1-5/+4
2019-05-22make test: fix bug due to concurrent commitsKlement Sekera1-4/+4
2019-05-20reassembly: prevent long chain attackKlement Sekera1-0/+72
2019-05-08VPP-1508: Fix zip() under python3.Paul Vinciguerra1-4/+4
2019-04-18GRE: API updateNeale Ranns1-2/+2
2019-04-11Tests: Refactor tearDown show command logging, add lifecycle markers.Paul Vinciguerra1-1/+9
2019-04-10Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.Paul Vinciguerra1-0/+16
2019-03-11VPP-1508: Use scapy.compat to manage packet level library differences.Paul Vinciguerra1-4/+8
2019-03-07Tests: Refactor payload_to_info()Paul Vinciguerra1-3/+3
2019-03-06Tests: Example duplicate code refactoring.Paul Vinciguerra1-273/+243
2019-02-19reassembly: fix buffer usage counterKlement Sekera1-0/+5
2019-01-10Enable random reassembly test for ARMjuraj.linkes1-4/+1
2018-12-20Tests: Cleanup @skip decorator.Paul Vinciguerra1-1/+1
2018-12-20reassembly: replace asserts with error countersKlement Sekera1-2/+3
2018-12-16VPP-1523: harden reassemblyKlement Sekera1-2/+42
2018-12-13reassembly: fix internal buffer count accountingKlement Sekera1-0/+31
2018-12-13VPP-1522: harden reassembly codeKlement Sekera1-3/+28
2018-12-06Skip failing ARM testcases for CIjuraj.linkes1-1/+4
2018-11-27VPP-1508 python3 tests: use six.iteritemsPaul Vinciguerra1-5/+7
2018-10-04Support reassembly for fragments coming to ip4-local nodeJuraj Sloboda1-1/+118
2018-09-11GBP Endpoint UpdatesNeale Ranns1-1/+2
2018-04-03reassembly: bug fixesKlement Sekera1-2/+1
2018-03-21reassembly: feature/concurrencyKlement Sekera1-0/+972
2018-03-07test: disable reassembly tests and system modification scriptDamjan Marion1-966/+0
2018-02-01IPv4/6 reassemblyKlement Sekera1-0/+966
kground-color: #fff0f0 } /* Literal.String.Escape */ .highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */ .highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */ .highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */ .highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */ .highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */ .highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */ .highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */ .highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */ .highlight .vc { color: #336699 } /* Name.Variable.Class */ .highlight .vg { color: #dd7700 } /* Name.Variable.Global */ .highlight .vi { color: #3333bb } /* Name.Variable.Instance */ .highlight .vm { color: #336699 } /* Name.Variable.Magic */ .highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */ }
"""
  IP Types

"""
import logging

from ipaddress import ip_address
from socket import AF_INET, AF_INET6
from vpp_papi import VppEnum
try:
    text_type = unicode
except NameError:
    text_type = str

_log = logging.getLogger(__name__)


class DpoProto:
    DPO_PROTO_IP4 = 0
    DPO_PROTO_IP6 = 1
    DPO_PROTO_MPLS = 2
    DPO_PROTO_ETHERNET = 3
    DPO_PROTO_BIER = 4
    DPO_PROTO_NSH = 5


INVALID_INDEX = 0xffffffff


class VppIpAddressUnion():
    def __init__(self, addr):
        self.addr = addr
        self.ip_addr = ip_address(text_type(self.addr))

    def encode(self):
        if self.version == 6:
            return {'ip6': self.ip_addr}
        else:
            return {'ip4': self.ip_addr}

    @property
    def version(self):
        return self.ip_addr.version

    @property
    def address(self):
        return self.addr

    @property
    def length(self):
        return self.ip_addr.max_prefixlen

    @property
    def bytes(self):
        return self.ip_addr.packed

    def __eq__(self, other):
        if isinstance(other, self.__class__):
            return self.ip_addr == other.ip_addr
        elif hasattr(other, "ip4") and hasattr(other, "ip6"):
            # vl_api_address_union_t
            if 4 == self.version:
                return self.ip_addr.packed == other.ip4
            else:
                return self.ip_addr.packed == other.ip6
        else:
            _log.error("Comparing VppIpAddressUnions:%s"
                       " with incomparable type: %s",
                       self, other)
            return NotImplemented

    def __str__(self):
        return str(self.ip_addr)


class VppIpAddress():
    def __init__(self, addr):
        self.addr = VppIpAddressUnion(addr)

    def encode(self):
        if self.addr.version == 6:
            return {
                'af': VppEnum.vl_api_address_family_t.ADDRESS_IP6,
                'un': self.addr.encode()
            }
        else:
            return {
                'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4,
                'un': self.addr.encode()
            }

    def __eq__(self, other):
        if isinstance(other, self.__class__):
            return self.addr == other.addr
        elif hasattr(other, "af") and hasattr(other, "un"):
            # a vp_api_address_t
            if 4 == self.version:
                return other.af == \
                    VppEnum.vl_api_address_family_t.ADDRESS_IP4 and \
                    other.un == self.addr
            else:
                return other.af == \
                    VppEnum.vl_api_address_family_t.ADDRESS_IP6 and \
                    other.un == self.addr
        else:
            _log.error(
                "Comparing VppIpAddress:<%s> %s with incomparable "
                "type: <%s> %s",
                self.__class__.__name__, self,
                other.__class__.__name__, other)
            return NotImplemented

    def __ne__(self, other):
        return not (self == other)

    def __str__(self):
        return self.address

    @property
    def bytes(self):
        return self.addr.bytes

    @property
    def address(self):
        return self.addr.address

    @property
    def length(self):
        return self.addr.length

    @property
    def version(self):
        return self.addr.version

    @property
    def is_ip6(self):
        return (self.version == 6)

    @property
    def af(self):
        if self.version == 6:
            return AF_INET6
        else:
            return AF_INET

    @property
    def dpo_proto(self):
        if self.version == 6:
            return DpoProto.DPO_PROTO_IP6
        else:
            return DpoProto.DPO_PROTO_IP4


class VppIpPrefix():
    def __init__(self, addr, len):
        self.addr = VppIpAddress(addr)
        self.len = len

    def __eq__(self, other):
        if self.address == other.address and self.len == other.len:
            return True
        return False

    def encode(self):
        return {'address': self.addr.encode(),
                'len': self.len}

    @property
    def version(self):
        return self.addr.version

    @property
    def address(self):
        return self.addr.address

    @property
    def bytes(self):
        return self.addr.bytes

    @property
    def length(self):
        return self.len

    @property
    def is_ip6(self):
        return self.addr.is_ip6

    def __str__(self):
        return "%s/%d" % (self.address, self.length)

    def __eq__(self, other):
        if isinstance(other, self.__class__):
            return (self.len == other.len and self.addr == other.addr)
        elif hasattr(other, "address") and hasattr(other, "len"):
            # vl_api_prefix_t
            return self.len == other.len and \
                   self.addr == other.address
        else:
            _log.error(
                "Comparing VppIpPrefix:%s with incomparable type: %s" %
                (self, other))
            return NotImplemented


class VppIpMPrefix():
    def __init__(self, saddr, gaddr, glen):
        self.saddr = saddr
        self.gaddr = gaddr
        self.glen = glen
        self.ip_saddr = VppIpAddressUnion(text_type(self.saddr))
        self.ip_gaddr = VppIpAddressUnion(text_type(self.gaddr))
        if self.ip_saddr.version != self.ip_gaddr.version:
            raise ValueError('Source and group addresses must be of the '
                             'same address family.')

    def encode(self):
        if 6 == self.ip_saddr.version:
            prefix = {
                'af': VppEnum.vl_api_address_family_t.ADDRESS_IP6,
                'grp_address': {
                    'ip6': self.gaddr
                },
                'src_address': {
                    'ip6': self.saddr
                },
                'grp_address_length': self.glen,
            }
        else:
            prefix = {
                'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4,
                'grp_address': {
                    'ip4': self.gaddr
                },
                'src_address': {
                    'ip4':  self.saddr
                },
                'grp_address_length': self.glen,
            }
        return prefix

    @property
    def length(self):
        return self.glen

    @property
    def version(self):
        return self.ip_gaddr.version

    def __str__(self):
        return "(%s,%s)/%d" % (self.saddr, self.gaddr, self.glen)

    def __eq__(self, other):
        if isinstance(other, self.__class__):
            return (self.glen == other.glen and
                    self.ip_saddr == other.ip_gaddr and
                    self.ip_saddr == other.ip_saddr)
        elif (hasattr(other, "grp_address_length") and
              hasattr(other, "grp_address") and
              hasattr(other, "src_address")):
            # vl_api_mprefix_t
            if 4 == self.ip_saddr.version:
                if self.glen == other.grp_address_length and \
                   self.gaddr == str(other.grp_address.ip4) and \
                   self.saddr == str(other.src_address.ip4):
                    return True
                return False
            else:
                return (self.glen == other.grp_address_length and
                        self.gaddr == other.grp_address.ip6 and
                        self.saddr == other.src_address.ip6)
        else:
            raise Exception("Comparing VppIpPrefix:%s with unknown type: %s" %
                            (self, other))
        return False