summaryrefslogtreecommitdiffstats
path: root/test/vpp_ipsec.py
blob: f50d491c3964c283ed0b628a5f09f0a372c9f4b7 (plain)
1
2
[gerrit]
host=gerrit.fd.io
port=29418
project=vpp
' href='#n300'>300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
from vpp_object import VppObject
from ipaddress import ip_address
from vpp_papi import VppEnum
from vpp_interface import VppInterface

try:
    text_type = unicode
except NameError:
    text_type = str


def mk_counter():
    return {"packets": 0, "bytes": 0}


class VppIpsecSpd(VppObject):
    """
    VPP SPD DB
    """

    def __init__(self, test, id):
        self.test = test
        self.id = id

    def add_vpp_config(self):
        self.test.vapi.ipsec_spd_add_del(self.id)
        self.test.registry.register(self, self.test.logger)

    def remove_vpp_config(self):
        self.test.vapi.ipsec_spd_add_del(self.id, is_add=0)

    def object_id(self):
        return "ipsec-spd-%d" % self.id

    def query_vpp_config(self):
        spds = self.test.vapi.ipsec_spds_dump()
        for spd in spds:
            if spd.spd_id == self.id:
                return True
        return False


class VppIpsecSpdItfBinding(VppObject):
    """
    VPP SPD DB to interface binding
    (i.e. this SPD is used on this interface)
    """

    def __init__(self, test, spd, itf):
        self.test = test
        self.spd = spd
        self.itf = itf

    def add_vpp_config(self):
        self.test.vapi.ipsec_interface_add_del_spd(self.spd.id, self.itf.sw_if_index)
        self.test.registry.register(self, self.test.logger)

    def remove_vpp_config(self):
        self.test.vapi.ipsec_interface_add_del_spd(
            self.spd.id, self.itf.sw_if_index, is_add=0
        )

    def object_id(self):
        return "bind-%s-to-%s" % (self.spd.id, self.itf)

    def query_vpp_config(self):
        bs = self.test.vapi.ipsec_spd_interface_dump()
        for b in bs:
            if b.sw_if_index == self.itf.sw_if_index:
                return True
        return False


class VppIpsecSpdEntry(VppObject):
    """
    VPP SPD DB Entry
    """

    def __init__(
        self,
        test,
        spd,
        sa_id,
        local_start,
        local_stop,
        remote_start,
        remote_stop,
        proto=socket.IPPROTO_RAW,
        priority=100,
        policy=None,
        is_outbound=1,
        remote_port_start=0,
        remote_port_stop=65535,
        local_port_start=0,
        local_port_stop=65535,
    ):
        self.test = test
        self.spd = spd
        self.sa_id = sa_id
        self.local_start = ip_address(text_type(local_start))
        self.local_stop = ip_address(text_type(local_stop))
        self.remote_start = ip_address(text_type(remote_start))
        self.remote_stop = ip_address(text_type(remote_stop))
        self.proto = proto
        self.is_outbound = is_outbound
        self.priority = priority
        if not policy:
            self.policy = VppEnum.vl_api_ipsec_spd_action_t.IPSEC_API_SPD_ACTION_BYPASS
        else:
            self.policy = policy
        self.is_ipv6 = 0 if self.local_start.version == 4 else 1
        self.local_port_start = local_port_start
        self.local_port_stop = local_port_stop
        self.remote_port_start = remote_port_start
        self.remote_port_stop = remote_port_stop

    def add_vpp_config(self):
        rv = self.test.vapi.ipsec_spd_entry_add_del(
            self.spd.id,
            self.sa_id,
            self.local_start,
            self.local_stop,
            self.remote_start,
            self.remote_stop,
            protocol=self.proto,
            is_ipv6=self.is_ipv6,
            is_outbound=self.is_outbound,
            priority=self.priority,
            policy=self.policy,
            local_port_start=self.local_port_start,
            local_port_stop=self.local_port_stop,
            remote_port_start=self.remote_port_start,
            remote_port_stop=self.remote_port_stop,
        )
        self.stat_index = rv.stat_index
        self.test.registry.register(self, self.test.logger)
        return self

    def remove_vpp_config(self):
        self.test.vapi.ipsec_spd_entry_add_del(
            self.spd.id,
            self.sa_id,
            self.local_start,
            self.local_stop,
            self.remote_start,
            self.remote_stop,
            protocol=self.proto,
            is_ipv6=self.is_ipv6,
            is_outbound=self.is_outbound,
            priority=self.priority,
            policy=self.policy,
            local_port_start=self.local_port_start,
            local_port_stop=self.local_port_stop,
            remote_port_start=self.remote_port_start,
            remote_port_stop=self.remote_port_stop,
            is_add=0,
        )

    def object_id(self):
        return "spd-entry-%d-%d-%d-%d-%d-%d" % (
            self.spd.id,
            self.priority,
            self.policy,
            self.is_outbound,
            self.is_ipv6,
            self.remote_port_start,
        )

    def query_vpp_config(self):
        ss = self.test.vapi.ipsec_spd_dump(self.spd.id)
        for s in ss:
            if (
                s.entry.sa_id == self.sa_id
                and s.entry.is_outbound == self.is_outbound
                and s.entry.priority == self.priority
                and s.entry.policy == self.policy
                and s.entry.remote_address_start == self.remote_start
                and s.entry.remote_port_start == self.remote_port_start
            ):
                return True
        return False

    def get_stats(self, worker=None):
        c = self.test.statistics.get_counter("/net/ipsec/policy")
        if worker is None:
            total = mk_counter()
            for t in c:
                total["packets"] += t[self.stat_index]["packets"]
            return total
        else:
            # +1 to skip main thread
            return c[worker + 1][self.stat_index]


class VppIpsecSA(VppObject):
    """
    VPP SAD Entry
    """

    DEFAULT_UDP_PORT = 4500

    def __init__(
        self,
        test,
        id,
        spi,
        integ_alg,
        integ_key,
        crypto_alg,
        crypto_key,
        proto,
        tun_src=None,
        tun_dst=None,
        flags=None,
        salt=0,
        tun_flags=None,
        dscp=None,
        udp_src=None,
        udp_dst=None,
        hop_limit=None,
    ):
        e = VppEnum.vl_api_ipsec_sad_flags_t
        self.test = test
        self.id = id
        self.spi = spi
        self.integ_alg = integ_alg
        self.integ_key = integ_key
        self.crypto_alg = crypto_alg
        self.crypto_key = crypto_key
        self.proto = proto
        self.salt = salt

        self.table_id = 0
        self.tun_src = tun_src
        self.tun_dst = tun_dst
        if not flags:
            self.flags = e.IPSEC_API_SAD_FLAG_NONE
        else:
            self.flags = flags
        if tun_src:
            self.tun_src = ip_address(text_type(tun_src))
            self.flags = self.flags | e.IPSEC_API_SAD_FLAG_IS_TUNNEL
        if tun_dst:
            self.tun_dst = ip_address(text_type(tun_dst))
        self.udp_src = udp_src
        self.udp_dst = udp_dst
        self.tun_flags = (
            VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
        )
        if tun_flags:
            self.tun_flags = tun_flags
        self.dscp = VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_CS0
        if dscp:
            self.dscp = dscp
        self.hop_limit = 255
        if hop_limit:
            self.hop_limit = hop_limit

    def tunnel_encode(self):
        return {
            "src": (self.tun_src if self.tun_src else []),
            "dst": (self.tun_dst if self.tun_dst else []),
            "encap_decap_flags": self.tun_flags,
            "dscp": self.dscp,
            "hop_limit": self.hop_limit,
            "table_id": self.table_id,
        }

    def add_vpp_config(self):
        entry = {
            "sad_id": self.id,
            "spi": self.spi,
            "integrity_algorithm": self.integ_alg,
            "integrity_key": {
                "length": len(self.integ_key),
                "data": self.integ_key,
            },
            "crypto_algorithm": self.crypto_alg,
            "crypto_key": {
                "data": self.crypto_key,
                "length": len(self.crypto_key),
            },
            "protocol": self.proto,
            "tunnel": self.tunnel_encode(),
            "flags": self.flags,
            "salt": self.salt,
        }
        # don't explicitly send the defaults, let papi fill them in
        if self.udp_src:
            entry["udp_src_port"] = self.udp_src
        if self.udp_dst:
            entry["udp_dst_port"] = self.udp_dst
        r = self.test.vapi.ipsec_sad_entry_add(entry=entry)
        self.stat_index = r.stat_index
        self.test.registry.register(self, self.test.logger)
        return self

    def update_vpp_config(
        self, udp_src=None, udp_dst=None, is_tun=False, tun_src=None, tun_dst=None
    ):
        if is_tun:
            if tun_src:
                self.tun_src = ip_address(text_type(tun_src))
            if tun_dst:
                self.tun_dst = ip_address(text_type(tun_dst))
        if udp_src:
            self.udp_src = udp_src
        if udp_dst:
            self.udp_dst = udp_dst
        self.test.vapi.ipsec_sad_entry_update(
            sad_id=self.id,
            is_tun=is_tun,
            tunnel=self.tunnel_encode(),
            udp_src_port=udp_src,
            udp_dst_port=udp_dst,
        )

    def remove_vpp_config(self):
        self.test.vapi.ipsec_sad_entry_del(id=self.id)

    def object_id(self):
        return "ipsec-sa-%d" % self.id

    def query_vpp_config(self):
        e = VppEnum.vl_api_ipsec_sad_flags_t

        bs = self.test.vapi.ipsec_sa_v3_dump()
        for b in bs:
            if b.entry.sad_id == self.id:
                # if udp encap is configured then the ports should match
                # those configured or the default
                if self.flags & e.IPSEC_API_SAD_FLAG_UDP_ENCAP:
                    if not b.entry.flags & e.IPSEC_API_SAD_FLAG_UDP_ENCAP:
                        return False
                    if self.udp_src:
                        if self.udp_src != b.entry.udp_src_port:
                            return False
                    else:
                        if self.DEFAULT_UDP_PORT != b.entry.udp_src_port:
                            return False
                    if self.udp_dst:
                        if self.udp_dst != b.entry.udp_dst_port:
                            return False
                    else:
                        if self.DEFAULT_UDP_PORT != b.entry.udp_dst_port:
                            return False
                return True
        return False

    def get_stats(self, worker=None):
        c = self.test.statistics.get_counter("/net/ipsec/sa")
        if worker is None:
            total = mk_counter()
            for t in c:
                total["packets"] += t[self.stat_index]["packets"]
            return total
        else:
            # +1 to skip main thread
            return c[worker + 1][self.stat_index]

    def get_lost(self, worker=None):
        c = self.test.statistics.get_counter("/net/ipsec/sa/lost")
        if worker is None:
            total = 0
            for t in c:
                total += t[self.stat_index]
            return total
        else:
            # +1 to skip main thread
            return c[worker + 1][self.stat_index]


class VppIpsecTunProtect(VppObject):
    """
    VPP IPSEC tunnel protection
    """

    def __init__(self, test, itf, sa_out, sas_in, nh=None):
        self.test = test
        self.itf = itf
        self.sas_in = []
        for sa in sas_in:
            self.sas_in.append(sa.id)
        self.sa_out = sa_out.id
        self.nh = nh
        if not self.nh:
            self.nh = "0.0.0.0"

    def update_vpp_config(self, sa_out, sas_in):
        self.sas_in = []
        for sa in sas_in:
            self.sas_in.append(sa.id)
        self.sa_out = sa_out.id
        self.test.vapi.ipsec_tunnel_protect_update(
            tunnel={
                "sw_if_index": self.itf._sw_if_index,
                "n_sa_in": len(self.sas_in),
                "sa_out": self.sa_out,
                "sa_in": self.sas_in,
                "nh": self.nh,
            }
        )

    def object_id(self):
        return "ipsec-tun-protect-%s-%s" % (self.itf, self.nh)

    def add_vpp_config(self):
        self.test.vapi.ipsec_tunnel_protect_update(
            tunnel={
                "sw_if_index": self.itf._sw_if_index,
                "n_sa_in": len(self.sas_in),
                "sa_out": self.sa_out,
                "sa_in": self.sas_in,
                "nh": self.nh,
            }
        )
        self.test.registry.register(self, self.test.logger)

    def remove_vpp_config(self):
        self.test.vapi.ipsec_tunnel_protect_del(
            sw_if_index=self.itf.sw_if_index, nh=self.nh
        )

    def query_vpp_config(self):
        bs = self.test.vapi.ipsec_tunnel_protect_dump(sw_if_index=self.itf.sw_if_index)
        for b in bs:
            if b.tun.sw_if_index == self.itf.sw_if_index and self.nh == str(b.tun.nh):
                return True
        return False


class VppIpsecInterface(VppInterface):
    """
    VPP IPSec interface
    """

    def __init__(self, test, mode=None, instance=0xFFFFFFFF):
        super(VppIpsecInterface, self).__init__(test)

        self.mode = mode
        if not self.mode:
            self.mode = VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_P2P
        self.instance = instance

    def add_vpp_config(self):
        r = self.test.vapi.ipsec_itf_create(
            itf={
                "user_instance": self.instance,
                "mode": self.mode,
            }
        )
        self.set_sw_if_index(r.sw_if_index)
        self.test.registry.register(self, self.test.logger)
        ts = self.test.vapi.ipsec_itf_dump(sw_if_index=self._sw_if_index)
        self.instance = ts[0].itf.user_instance
        return self

    def remove_vpp_config(self):
        self.test.vapi.ipsec_itf_delete(sw_if_index=self._sw_if_index)

    def query_vpp_config(self):
        ts = self.test.vapi.ipsec_itf_dump(sw_if_index=0xFFFFFFFF)
        for t in ts:
            if t.itf.sw_if_index == self._sw_if_index:
                return True
        return False

    def __str__(self):
        return self.object_id()

    def object_id(self):
        return "ipsec%d" % self.instance