aboutsummaryrefslogtreecommitdiffstats
path: root/NSH_SFC_VER
blob: 4a979a5789c9977f10b6c936cbc5885d83c52301 (plain)
1
2
VPP = 17.07-rc0
NSH_SFC = 17.07-rc0
000000; background-color: #ddffdd } /* Generic.Inserted */ .highlight .go { color: #888888 } /* Generic.Output */ .highlight .gp { color: #555555 } /* Generic.Prompt */ .highlight .gs { font-weight: bold } /* Generic.Strong */ .highlight .gu { color: #666666 } /* Generic.Subheading */ .highlight .gt { color: #aa0000 } /* Generic.Traceback */ .highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */ .highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */ .highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */ .highlight .kp { color: #008800 } /* Keyword.Pseudo */ .highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */ .highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */ .highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */ .highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */ .highlight .na { color: #336699 } /* Name.Attribute */ .highlight .nb { color: #003388 } /* Name.Builtin */ .highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */ .highlight .no { color: #003366; font-weight: bold } /* Name.Constant */ .highlight .nd { color: #555555 } /* Name.Decorator */ .highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */ .highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */ .highlight .nl { color: #336699; font-style: italic } /* Name.Label */ .highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */ .highlight .py { color: #336699; font-weight: bold } /* Name.Property */ .highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */ .highlight .nv { color: #336699 } /* Name.Variable */ .highlight .ow { color: #008800 } /* Operator.Word */ .highlight .w { color: #bbbbbb } /* Text.Whitespace */ .highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */ .highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */ .highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */ .highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */ .highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */ .highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */ .highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */ .highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */ .highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */ .highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */ .highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */ .highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */ .highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */ .highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */ .highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */ .highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */ .highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */ .highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */ .highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */ .highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */ .highlight .vc { color: #336699 } /* Name.Variable.Class */ .highlight .vg { color: #dd7700 } /* Name.Variable.Global */ .highlight .vi { color: #3333bb } /* Name.Variable.Instance */ .highlight .vm { color: #336699 } /* Name.Variable.Magic */ .highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */ }
import socket

from scapy.layers.inet import IP, ICMP
from scapy.layers.l2 import Ether
from scapy.layers.ipsec import *

from framework import VppTestCase
from vpp_ip_route import VppIpRoute

from util import ppp


class TestIpsecAh(VppTestCase):
    """
    Basic test for IPSEC using AH transport and Tunnel mode

    Below 4 cases are covered as part of this test
    1) ipsec ah v4 transport basic test  - IPv4 Transport mode
     scenario using HMAC-SHA1-96 intergrity algo
    2) ipsec ah v4 transport burst test
     Above test for 257 pkts
    3) ipsec ah 4o4 tunnel basic test    - IPv4 Tunnel mode
     scenario using HMAC-SHA1-96 intergrity algo
    4) ipsec ah 4o4 tunnel burst test
     Above test for 257 pkts

    TRANSPORT MODE:

     ---   encrypt   ---
    |pg2| <-------> |VPP|
     ---   decrypt   ---

    TUNNEL MODE:

     ---   encrypt   ---   plain   ---
    |pg0| ------->  |VPP| ------> |pg1|
     ---             ---           ---

     ---   decrypt   ---   plain   ---
    |pg0| <-------  |VPP| <------ |pg1|
     ---             ---           ---

    Note : IPv6 is not covered
    """

    remote_pg0_lb_addr = '1.1.1.1'
    remote_pg1_lb_addr = '2.2.2.2'

    @classmethod
    def setUpClass(cls):
        super(TestIpsecAh, cls).setUpClass()
        try:
            cls.create_pg_interfaces(range(3))
            cls.interfaces = list(cls.pg_interfaces)
            for i in cls.interfaces:
                i.admin_up()
                i.config_ip4()
                i.resolve_arp()
            cls.logger.info(cls.vapi.ppcli("show int addr"))
            cls.configAhTra()
            cls.logger.info(cls.vapi.ppcli("show ipsec"))
            cls.configAhTun()
            cls.logger.info(cls.vapi.ppcli("show ipsec"))
        except Exception:
            super(TestIpsecAh, cls).tearDownClass()
            raise

    @classmethod
    def configAhTun(cls):
        try:
            spd_id = 1
            remote_sa_id = 10
            local_sa_id = 20
            remote_tun_spi = 1001
            local_tun_spi = 1000
            src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
            cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
            dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
            cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
            cls.vapi.ipsec_sad_add_del_entry(
                remote_sa_id,
                remote_tun_spi,
                cls.pg0.local_ip4n,
                cls.pg0.remote_ip4n,
                integrity_key_length=20)
            cls.vapi.ipsec_sad_add_del_entry(
                local_sa_id,
                local_tun_spi,
                cls.pg0.remote_ip4n,
                cls.pg0.local_ip4n,
                integrity_key_length=20)
            cls.vapi.ipsec_spd_add_del(spd_id)
            cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
            l_startaddr = r_startaddr = socket.inet_pton(
                socket.AF_INET, "0.0.0.0")
            l_stopaddr = r_stopaddr = socket.inet_pton(
                socket.AF_INET, "255.255.255.255")
            cls.vapi.ipsec_spd_add_del_entry(
                spd_id,
                l_startaddr,
                l_stopaddr,
                r_startaddr,
                r_stopaddr,
                protocol=51)
            cls.vapi.ipsec_spd_add_del_entry(
                spd_id,
                l_startaddr,
                l_stopaddr,
                r_startaddr,
                r_stopaddr,
                protocol=51,
                is_outbound=0)
            l_startaddr = l_stopaddr = socket.inet_pton(
                socket.AF_INET, cls.remote_pg0_lb_addr)
            r_startaddr = r_stopaddr = socket.inet_pton(
                socket.AF_INET, cls.remote_pg1_lb_addr)
            cls.vapi.ipsec_spd_add_del_entry(
                spd_id,
                l_startaddr,
                l_stopaddr,
                r_startaddr,
                r_stopaddr,
                priority=10,
                policy=3,
                is_outbound=0,
                sa_id=local_sa_id)
            cls.vapi.ipsec_spd_add_del_entry(
                spd_id,
                r_startaddr,
                r_stopaddr,
                l_startaddr,
                l_stopaddr,
                priority=10,
                policy=3,
                sa_id=remote_sa_id)
        except Exception:
            raise

    @classmethod
    def configAhTra(cls):
        try:
            spd_id = 2
            remote_sa_id = 30
            local_sa_id = 40
            remote_tra_spi = 2001
            local_tra_spi = 2000
            cls.vapi.ipsec_sad_add_del_entry(
                remote_sa_id,
                remote_tra_spi,
                integrity_key_length=20,
                is_tunnel=0)
            cls.vapi.ipsec_sad_add_del_entry(
                local_sa_id,
                local_tra_spi,
                integrity_key_length=20,
                is_tunnel=0)
            cls.vapi.ipsec_spd_add_del(spd_id)
            cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
            l_startaddr = r_startaddr = socket.inet_pton(
                socket.AF_INET, "0.0.0.0")
            l_stopaddr = r_stopaddr = socket.inet_pton(
                socket.AF_INET, "255.255.255.255")
            cls.vapi.ipsec_spd_add_del_entry(
                spd_id,
                l_startaddr,
                l_stopaddr,
                r_startaddr,
                r_stopaddr,
                protocol=51)
            cls.vapi.ipsec_spd_add_del_entry(
                spd_id,
                l_startaddr,
                l_stopaddr,
                r_startaddr,
                r_stopaddr,
                protocol=51,
                is_outbound=0)
            l_startaddr = l_stopaddr = cls.pg2.local_ip4n
            r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
            cls.vapi.ipsec_spd_add_del_entry(
                spd_id,
                l_startaddr,
                l_stopaddr,
                r_startaddr,
                r_stopaddr,
                priority=10,
                policy=3,
                is_outbound=0,
                sa_id=local_sa_id)
            cls.vapi.ipsec_spd_add_del_entry(
                spd_id,
                l_startaddr,
                l_stopaddr,
                r_startaddr,
                r_stopaddr,
                priority=10,
                policy=3,
                sa_id=remote_sa_id)
        except Exception:
            raise

    def configScapySA(self, is_tun=False):
        if is_tun:
            self.remote_tun_sa = SecurityAssociation(
                AH,
                spi=0x000003e8,
                auth_algo='HMAC-SHA1-96',
                auth_key='C91KUR9GYMm5GfkEvNjX',
                tunnel_header=IP(
                    src=self.pg0.remote_ip4,
                    dst=self.pg0.local_ip4))
            self.local_tun_sa = SecurityAssociation(
                AH,
                spi=0x000003e9,
                auth_algo='HMAC-SHA1-96',
                auth_key='C91KUR9GYMm5GfkEvNjX',
                tunnel_header=IP(
                    dst=self.pg0.remote_ip4,
                    src=self.pg0.local_ip4))
        else:
            self.remote_tra_sa = SecurityAssociation(
                AH, spi=0x000007d0, auth_algo='HMAC-SHA1-96',
                auth_key='C91KUR9GYMm5GfkEvNjX')
            self.local_tra_sa = SecurityAssociation(
                AH, spi=0x000007d1, auth_algo='HMAC-SHA1-96',
                auth_key='C91KUR9GYMm5GfkEvNjX')

    def tearDown(self):
        super(TestIpsecAh, self).tearDown()
        if not self.vpp_dead:
            self.vapi.cli("show hardware")

    def send_and_expect(self, input, pkts, output, count=1):
        input.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        rx = output.get_capture(count)
        return rx

    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
                sa.encrypt(IP(src=src, dst=dst) / ICMP() /
                "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
                ] * count

    def gen_pkts(self, sw_intf, src, dst, count=1):
        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
                IP(src=src, dst=dst) / ICMP() /
                "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
                ] * count

    def test_ipsec_ah_tra_basic(self, count=1):
        """ ipsec ah v4 transport basic test """
        try:
            self.configScapySA()
            send_pkts = self.gen_encrypt_pkts(
                self.remote_tra_sa,
                self.pg2,
                src=self.pg2.remote_ip4,
                dst=self.pg2.local_ip4,
                count=count)
            recv_pkts = self.send_and_expect(
                self.pg2, send_pkts, self.pg2, count=count)
            # ESP TRA VPP encryption/decryption verification
            for Pkts in recv_pkts:
                Pkts[AH].padding = Pkts[AH].icv[12:]
                Pkts[AH].icv = Pkts[AH].icv[:12]
                decrypt_pkt = self.local_tra_sa.decrypt(Pkts[IP])
        finally:
            self.logger.info(self.vapi.ppcli("show error"))
            self.logger.info(self.vapi.ppcli("show ipsec"))

    def test_ipsec_ah_tra_burst(self):
        """ ipsec ah v4 transport burst test """
        try:
            self.test_ipsec_ah_tra_basic(count=257)
        finally:
            self.logger.info(self.vapi.ppcli("show error"))
            self.logger.info(self.vapi.ppcli("show ipsec"))

    def test_ipsec_ah_tun_basic(self, count=1):
        """ ipsec ah 4o4 tunnel basic test """
        try:
            self.configScapySA(is_tun=True)
            send_pkts = self.gen_encrypt_pkts(
                self.remote_tun_sa,
                self.pg0,
                src=self.remote_pg0_lb_addr,
                dst=self.remote_pg1_lb_addr,
                count=count)
            recv_pkts = self.send_and_expect(
                self.pg0, send_pkts, self.pg1, count=count)
            # ESP TUN VPP decryption verification
            for recv_pkt in recv_pkts:
                self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
                self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
            send_pkts = self.gen_pkts(
                self.pg1,
                src=self.remote_pg1_lb_addr,
                dst=self.remote_pg0_lb_addr,
                count=count)
            recv_pkts = self.send_and_expect(
                self.pg1, send_pkts, self.pg0, count=count)
            # ESP TUN VPP encryption verification
            for recv_pkt in recv_pkts:
                recv_pkt[IP] = recv_pkt[IP] / IP(recv_pkt[AH].icv[12:])
                recv_pkt[AH].icv = recv_pkt[AH].icv[:12]
                decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
                self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
                self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
        finally:
            self.logger.info(self.vapi.ppcli("show error"))
            self.logger.info(self.vapi.ppcli("show ipsec"))

    def test_ipsec_ah_tun_burst(self):
        """ ipsec ah 4o4 tunnel burst test """
        try:
            self.test_ipsec_ah_tun_basic(count=257)
        finally:
            self.logger.info(self.vapi.ppcli("show error"))
            self.logger.info(self.vapi.ppcli("show ipsec"))


if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)