aboutsummaryrefslogtreecommitdiffstats
path: root/docs/usecases/container_test.md
blob: dc45aabe57b11d3044da347e89725468e71cdb70 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
Container-based network simulation
==================================

The "make test" framework provides a good way to test individual
features. However, when testing several features at once - or
validating nontrivial configurations - it may prove difficult or
impossible to use the unit-test framework.

This note explains how to set up lxc/lxd, and a 5-container testbed to
test a split-tunnel nat + ikev2 + ipsec + ipv6 prefix-delegation
scenario.

OS / Distro test results
------------------------

This setup has been tested on an Ubuntu 18.04 LTS system. If you're
feeling adventurous, the same scenario also worked on a recent Ubuntu
20.04 "preview" daily build.

Other distros may work fine, or not at all.

Proxy Server
------------

If you need to use a proxy server e.g. from a lab system, you'll
probably need to set HTTP_PROXY, HTTPS_PROXY, http_proxy and
https_proxy in /etc/environment. Directly setting variables in the
environment doesn't work. The lxd snap _daemon_ needs the proxy settings,
not the user interface.

Something like so:

```
    HTTP_PROXY=http://my.proxy.server:8080
    HTTPS_PROXY=http://my.proxy.server:4333
    http_proxy=http://my.proxy.server:8080
    https_proxy=http://my.proxy.server:4333
```

Install and configure lxd
-------------------------

Install the lxd snap. The lxd snap is up to date, as opposed to the
results of "sudo apt-get install lxd".

```
    # snap install lxd
    # lxd init
```

"lxd init" asks several questions. With the exception of the storage
pool, take the defaults. To match the configs shown below, create a
storage pool named "vpp." Storage pools of type "zfs" and "files" have
been tested successfully.

zfs is more space-efficient. "lxc copy" is infinitely faster with
zfs. The path for the zfs storage pool is under /var. Do not replace
it with a symbolic link, unless you want to rebuild all of your
containers from scratch. Ask me how I know that.

Create three network segments
-----------------------------

Aka, linux bridges.

```
    # lxc network create dora
    # lxc network create internet
    # lxc network create swan
```

We'll explain the test topology in a bit. Stay tuned.

Set up the default container profile
------------------------------------

Execute "lxc profile edit default", and install the following
configuration. Note that the "shared" directory should mount your vpp
workspaces. With that trick, you can edit code from any of the
containers, run vpp without installing it, etc.

```
    config: {}
    description: Default LXD profile
    devices:
      eth0:
        name: eth0
        network: lxdbr0
        type: nic
      eth1:
        name: eth1
        nictype: bridged
        parent: internet
        type: nic
      eth2:
        name: eth2
        nictype: bridged
        parent: dora
        type: nic
      eth3:
        name: eth3
        nictype: bridged
        parent: swan
        type: nic
      root:
        path: /
        pool: vpp
        type: disk
      shared:
        path: /scratch
        source: /scratch
        type: disk
    name: default
```

Set up the network configurations
---------------------------------

Edit the fake "internet" backbone:

```
  # lxc network edit internet
```

Install the ip addresses shown below, to avoid having to rebuild the vpp
and host configuration:

```
    config:
      ipv4.address: 10.26.68.1/24
      ipv4.dhcp.ranges: 10.26.68.10-10.26.68.50
      ipv4.nat: "true"
      ipv6.address: none
      ipv6.nat: "false"
    description: ""
    name: internet
    type: bridge
    used_by:
    managed: true
    status: Created
    locations:
    - none
```

Repeat the process with the "dora" and "swan" networks, using these
configurations:

### dora network configuration

```
    config:
      ipv4.address: 10.166.14.1/24
      ipv4.dhcp.ranges: 10.166.14.10-10.166.14.50
      ipv4.nat: "true"
      ipv6.address: none
      ipv6.nat: "false"
    description: ""
    name: dora
    type: bridge
    used_by:
    managed: true
    status: Created
    locations:
    - none
```
### swan network configuration

```
    config:
      ipv4.address: 10.219.188.1/24
      ipv4.dhcp.ranges: 10.219.188.10-10.219.188.50
      ipv4.nat: "true"
      ipv6.address: none
      ipv6.nat: "false"
    description: ""
    name: swan
    type: bridge
    used_by:
    managed: true
    status: Created
    locations:
    - none
```

Create a "master" container image
---------------------------------

The master container image should be set up so that you can
build vpp, ssh into the container, edit source code, run gdb, etc.

Make sure that e.g. public key auth ssh works.

```
    # lxd launch ubuntu:18.04 dora
    <spew>
    # lxc exec dora bash
    dora# cd /scratch/my-vpp-workspace
    dora# apt-get install make ssh
    dora# make install-dep
    dora# exit
    # lxc stop dora
```

Mark the container image privileged. If you forget this step, you'll
trip over a netlink error (-11) aka EAGAIN when you try to roll in the
vpp configurations.

```
    # lxc config set dora security.privileged "true"
```

Duplicate the "master" container image
--------------------------------------

To avoid having to configure N containers, be sure that the master
container image is fully set up before you help it have children:

```
    # lxc copy dora dorahost
    # lxc copy dora swan
    # lxc copy dora swanhost
    # lxc copy dora dhcpserver    # optional, to test ipv6 prefix delegation
```

Install handy script
--------------------

See below for a handly script which executes lxc commands across the
current set of running containers. I call it "lxc-foreach," feel free
to call the script Ishmael if you like.

Examples:

```
    $ lxc-foreach start
    <issues "lxc start" for each container in the list>
```

After a few seconds, use this one to open an ssh connection to each
container. The ssh command parses the output of "lxc info," which
displays container ip addresses.

```
    $ lxc-foreach ssh
```

Here's the script:

```
    #!/bin/bash

    set -u
    export containers="dora dorahost swan swanhost dhcpserver"

    if [ x$1 = "x" ] ; then
        echo missing command
        exit 1
    fi

    if [ $1 = "ssh" ] ; then
        for c in $containers
        do
            inet=`lxc info $c | grep eth0 | grep -v inet6 | head -1 | cut -f 3`
            if [ x$inet = "x" ] ; then
                echo $c not started
            else
                gnome-terminal --command "/usr/bin/ssh $inet"
            fi
        done
    exit 0
    fi

    for c in $containers
    do
        echo lxc $1 $c
        lxc $1 $c
    done

    exit 0
```

Test topology
-------------

Finally, we're ready to describe a test topology. First, a picture:

```
    ===+======== management lan/bridge lxdbr0 (dhcp) ===========+===
       |                             |                          |
       |                             |                          |
       |                             |                          |
       v                             |                          v
      eth0                           |                         eth0
    +------+ eth1                                       eth1 +------+
    | dora | 10.26.88.100 <= internet bridge => 10.26.88.101 | swan |
    +------+                                                 +------+
      eth2 / bvi0 10.166.14.2        |       10.219.188.2 eth3 / bvi0
       |                             |                          |
       | ("dora" bridge)             |          ("swan" bridge) |
       |                             |                          |
       v                             |                          v
      eth2 10.166.14.3               |           eth3 10.219.188.3
    +----------+                     |                   +----------+
    | dorahost |                     |                   | dorahost |
    +----------+                     |                   +----------+
      eth0 (management lan) <========+========> eth0 (management lan)
```

### Test topology discussion

This topology is suitable for testing almost any tunnel encap/decap
scenario.  The two containers "dorahost" and "swanhost" are end-stations
connected to two vpp instances running on "dora" and "swan".

We leverage the Linux end-station network stacks to generate traffic
of all sorts.

The so-called "internet" bridge models the public internet. The "dora" and
"swan" bridges connect vpp instances to local hosts

End station configs
-------------------

The end-station Linux configurations set up the eth2 and eth3 ip
addresses shown above, and add tunnel routes to the opposite
end-station networks.

### dorahost configuration

```
    ifconfig eth2 10.166.14.3/24 up
    route add -net 10.219.188.0/24 gw 10.166.14.2
```

### swanhost configuration

```
    sudo ifconfig eth3 10.219.188.3/24 up
    sudo route add -net 10.166.14.0/24 gw 10.219.188.2
```

VPP configs
-----------

Split nat44 / ikev2 + ipsec tunneling, with ipv6 prefix delegation in
the "dora" config.

### dora configuration

```
    set term pag off

    comment { "internet" }
    create host-interface name eth1
    set int ip address host-eth1 10.26.68.100/24
    set int ip6 table host-eth1 0
    set int state host-eth1 up

    comment { default route via swan }
    ip route add 0.0.0.0/0 via 10.26.68.101

    comment { "dora-private-net" }
    create host-interface name eth2
    bvi create instance 0
    set int l2 bridge bvi0 1 bvi
    set int ip address bvi0 10.166.14.2/24
    set int state bvi0 up
    set int l2 bridge host-eth2 1
    set int state host-eth2 up


    nat44 add interface address host-eth1
    set interface nat44 in host-eth2 out host-eth1
    nat44 add identity mapping external host-eth1 udp 500
    nat44 add identity mapping external host-eth1 udp 4500
    comment { nat44 untranslated subnet 10.219.188.0/24 }

    comment { responder profile }
    ikev2 profile add swan
    ikev2 profile set swan udp-encap
    ikev2 profile set swan auth rsa-sig cert-file /scratch/setups/doracert.pem
    set ikev2 local key /scratch/setups/swankey.pem
    ikev2 profile set swan id local fqdn swan.barachs.net
    ikev2 profile set swan id remote fqdn broiler2.barachs.net
    ikev2 profile set swan traffic-selector remote ip-range 10.219.188.0 - 10.219.188.255 port-range 0 - 65535 protocol 0
    ikev2 profile set swan traffic-selector local ip-range 10.166.14.0 - 10.166.14.255 port-range 0 - 65535 protocol 0
    create ipip tunnel src 10.26.68.100 dst 10.26.68.101
    ikev2 profile set swan tunnel ipip0

    comment { ipv6 prefix delegation }
    ip6 nd address autoconfig host-eth1 default-route
    dhcp6 client host-eth1
    dhcp6 pd client host-eth1 prefix group hgw
    set ip6 address bvi0 prefix group hgw ::2/56
    ip6 nd address autoconfig bvi0 default-route
    ip6 nd bvi0 ra-interval 5 3 ra-lifetime 180

    set int mtu packet 1390 ipip0
    set int unnum ipip0 use host-eth1
    ip route add 10.219.188.0/24 via ipip0
```

### swan configuration

```
    set term pag off

    comment { "internet" }
    create host-interface name eth1
    comment { set dhcp client intfc host-eth1 hostname swan }
    set int ip address host-eth1 10.26.68.101/24
    set int state host-eth1 up

    comment { default route via "internet gateway" }
    comment { ip route add 0.0.0.0/0 via 10.26.68.1 }

    comment { "swan-private-net" }
    create host-interface name eth3
    bvi create instance 0
    set int l2 bridge bvi0 1 bvi
    set int ip address bvi0 10.219.188.2/24
    set int state bvi0 up
    set int l2 bridge host-eth3 1
    set int state host-eth3 up

    nat44 add interface address host-eth1
    set interface nat44 in bvi0 out host-eth1
    nat44 add identity mapping external host-eth1 udp 500
    nat44 add identity mapping external host-eth1 udp 4500
    comment { nat44 untranslated subnet 10.166.14.0/24 }

    comment { initiator profile }
    ikev2 profile add dora
    ikev2 profile set dora udp-encap
    ikev2 profile set dora auth rsa-sig cert-file /scratch/setups/swancert.pem
    set ikev2 local key /scratch/setups/dorakey.pem
    ikev2 profile set dora id local fqdn broiler2.barachs.net
    ikev2 profile set dora id remote fqdn swan.barachs.net

    ikev2 profile set dora traffic-selector remote ip-range 10.166.14.0 - 10.166.14.255 port-range 0 - 65535 protocol 0
    ikev2 profile set dora traffic-selector local ip-range 10.219.188.0 - 10.219.188.255 port-range 0 - 65535 protocol 0

    ikev2 profile set dora responder host-eth1 10.26.68.100
    ikev2 profile set dora ike-crypto-alg aes-cbc 256  ike-integ-alg sha1-96  ike-dh modp-2048
    ikev2 profile set dora esp-crypto-alg aes-cbc 256  esp-integ-alg sha1-96  esp-dh ecp-256
    ikev2 profile set dora sa-lifetime 3600 10 5 0

    create ipip tunnel src 10.26.68.101 dst 10.26.68.100
    ikev2 profile set dora tunnel ipip0
    ikev2 initiate sa-init dora

    set int mtu packet 1390 ipip0
    set int unnum ipip0 use host-eth1
    ip route add 10.166.14.0/24 via ipip0
```

IKEv2 certificate setup
-----------------------

In both of the vpp configurations, you'll see "/scratch/setups/xxx.pem"
mentioned. These certificates are used in the ikev2 key exchange.

Here's how to generate the certificates:

```
    openssl req -x509 -nodes -newkey rsa:4096 -keyout dorakey.pem -out doracert.pem -days 3560
    openssl x509 -text -noout -in doracert.pem
    openssl req -x509 -nodes -newkey rsa:4096 -keyout swankey.pem -out swancert.pem -days 3560
    openssl x509 -text -noout -in swancert.pem
```

Make sure that the "dora" and "swan" configurations point to the certificates.

DHCPv6 server setup
-------------------

If you need an ipv6 dhcp server to test ipv6 prefix delegation,
create the "dhcpserver" container as shown above.

Install the "isc-dhcp-server" Debian package:

```
    sudo apt-get install isc-dhcp-server
```

### /etc/dhcp/dhcpd6.conf

Edit the dhcpv6 configuration and add an ipv6 subnet with prefix
delegation. For example:

```
    subnet6 2001:db01:0:1::/64 {
            range6 2001:db01:0:1::1 2001:db01:0:1::9;
            prefix6 2001:db01:0:100:: 2001:db01:0:200::/56;
    }
```

Add an ipv6 address on eth1, which is connected to the "internet"
bridge, and start the dhcp server. I use the following trivial bash
script, which runs the dhcp6 server in the foreground and produces
dhcp traffic spew:

```
    #!/bin/bash
    ifconfig eth1 inet6 add 2001:db01:0:1::10/64 || true
    dhcpd -6 -d -cf /etc/dhcp/dhcpd6.conf
```

The "|| true" bit keeps going if eth1 already has the indicated ipv6
address.

Container / Host Interoperation
-------------------------------

Host / container interoperation is highly desirable. If the host and a
set of containers don't run the same distro _and distro version_, it's
reasonably likely that the glibc versions won't match. That, in turn,
makes vpp binaries built in one environment fail in the other.

Trying to install multiple versions of glibc - especially at the host
level - often ends very badly and is _not recommended_. It's not just
glibc, either. The dynamic loader ld-linux-xxx-so.2 is glibc version
specific.

Fortunately, it's reasonable easy to build lxd container images based on
specific Ubuntu or Debian versions.

### Create a custom root filesystem image

First, install the "debootstrap" tool:

```
    sudo apt-get install debootstrap
```

Make a temp directory, and use debootstrap to populate it. In this
example, we create an Ubuntu 20.04 (focal fossa) base image:

```
    # mkdir /tmp/myroot
    # debootstrap focal /tmp/myroot http://archive.ubuntu.com/ubuntu
```

To tinker with the base image (if desired):

```
    # chroot /tmp/myroot
    <add packages, etc.>
    # exit
```

Make a compressed tarball of the base image:

```
    # tar zcf /tmp/rootfs.tar.gz -C /tmp/myroot .
```

Create a "metadata.yaml" file which describes the base image:

```
    architecture: "x86_64"
    # To get current date in Unix time, use `date +%s` command
    creation_date: 1458040200
    properties:
    architecture: "x86_64"
    description: "My custom Focal Fossa image"
    os: "Ubuntu"
    release: "focal"
```

Make a compressed tarball of metadata.yaml:

```
    # tar zcf metadata.tar.gz metadata.yaml
```

Import the image into lxc / lxd:

```
    $ lxc image import metadata.tar.gz rootfd.tar.gz --alias focal-base
```

### Create a container which uses the customized base image:

```
    $ lxc launch focal-base focaltest
    $ lxc exec focaltest bash
```

The next several steps should be executed in the container, in the
bash shell spun up by "lxc exec..."

### Configure container networking

In the container, create /etc/netplan/50-cloud-init.yaml:

```
    network:
        version: 2
        ethernets:
            eth0:
                dhcp4: true
```

Use "cat > /etc/netplan/50-cloud-init.yaml", and cut-'n-paste if your
favorite text editor is AWOL.

Apply the configuration:

```
    # netplan apply
```

At this point, eth0 should have an ip address, and you should see
a default route with "route -n".

### Configure apt

Again, in the container, set up /etc/apt/sources.list via cut-'n-paste
from a recently update "focal fossa" host. Something like so:

```
    deb http://us.archive.ubuntu.com/ubuntu/ focal main restricted
    deb http://us.archive.ubuntu.com/ubuntu/ focal-updates main restricted
    deb http://us.archive.ubuntu.com/ubuntu/ focal universe
    deb http://us.archive.ubuntu.com/ubuntu/ focal-updates universe
    deb http://us.archive.ubuntu.com/ubuntu/ focal multiverse
    deb http://us.archive.ubuntu.com/ubuntu/ focal-updates multiverse
    deb http://us.archive.ubuntu.com/ubuntu/ focal-backports main restricted universe multiverse
    deb http://security.ubuntu.com/ubuntu focal-security main restricted
    deb http://security.ubuntu.com/ubuntu focal-security universe
    deb http://security.ubuntu.com/ubuntu focal-security multiverse
```

"apt-get update" and "apt-install" should produce reasonable results.
Suggest "apt-get install make git".

At this point, you can use the "/scratch" sharepoint (or similar) to
execute "make install-dep install-ext-deps" to set up the container
with the vpp toolchain; proceed as desired.
for i in cls.pg_interfaces: i.admin_up() # Mapping between packet-generator index and lists of test hosts cls.hosts_by_pg_idx = dict() for pg_if in cls.pg_interfaces: cls.hosts_by_pg_idx[pg_if.sw_if_index] = [] # Create list of deleted hosts cls.deleted_hosts_by_pg_idx = dict() for pg_if in cls.pg_interfaces: cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = [] # warm-up the mac address tables # self.warmup_test() except Exception: super(TestACLplugin, cls).tearDownClass() raise def setUp(self): super(TestACLplugin, self).setUp() self.reset_packet_infos() def tearDown(self): """ Show various debug prints after each test. """ super(TestACLplugin, self).tearDown() if not self.vpp_dead: self.logger.info(self.vapi.ppcli("show l2fib verbose")) self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id)) def create_hosts(self, count, start=0): """ Create required number of host MAC addresses and distribute them among interfaces. Create host IPv4 address for every host MAC address. :param int count: Number of hosts to create MAC/IPv4 addresses for. :param int start: Number to start numbering from. """ n_int = len(self.pg_interfaces) macs_per_if = count / n_int i = -1 for pg_if in self.pg_interfaces: i += 1 start_nr = macs_per_if * i + start end_nr = count + start if i == (n_int - 1) \ else macs_per_if * (i + 1) + start hosts = self.hosts_by_pg_idx[pg_if.sw_if_index] for j in range(start_nr, end_nr): host = Host( "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), "172.17.1%02x.%u" % (pg_if.sw_if_index, j), "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) hosts.append(host) def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1, s_prefix=0, s_ip='\x00\x00\x00\x00', d_prefix=0, d_ip='\x00\x00\x00\x00'): if proto == -1: return if ports == self.PORTS_ALL: sport_from = 0 dport_from = 0 sport_to = 65535 if proto != 1 and proto != 58 else 255 dport_to = sport_to elif ports == self.PORTS_RANGE: if proto == 1: sport_from = self.icmp4_type sport_to = self.icmp4_type dport_from = self.icmp4_code dport_to = self.icmp4_code elif proto == 58: sport_from = self.icmp6_type sport_to = self.icmp6_type dport_from = self.icmp6_code dport_to = self.icmp6_code elif proto == self.proto[self.IP][self.TCP]: sport_from = self.tcp_sport_from sport_to = self.tcp_sport_to dport_from = self.tcp_dport_from dport_to = self.tcp_dport_to elif proto == self.proto[self.IP][self.UDP]: sport_from = self.udp_sport_from sport_to = self.udp_sport_to dport_from = self.udp_dport_from dport_to = self.udp_dport_to else: sport_from = ports sport_to = ports dport_from = ports dport_to = ports rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto, 'srcport_or_icmptype_first': sport_from, 'srcport_or_icmptype_last': sport_to, 'src_ip_prefix_len': s_prefix, 'src_ip_addr': s_ip, 'dstport_or_icmpcode_first': dport_from, 'dstport_or_icmpcode_last': dport_to, 'dst_ip_prefix_len': d_prefix, 'dst_ip_addr': d_ip}) return rule def apply_rules(self, rules, tag=''): reply = self.api_acl_add_replace(acl_index=4294967295, r=rules, count=len(rules), tag=tag) self.logger.info("Dumped ACL: " + str( self.api_acl_dump(reply.acl_index))) # Apply a ACL on the interface as inbound for i in self.pg_interfaces: self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index, count=1, n_input=1, acls=[reply.acl_index]) return def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] if p == 'UDP': if ports == 0: return UDP(sport=random.randint(self.udp_sport_from, self.udp_sport_to), dport=random.randint(self.udp_dport_from, self.udp_dport_to)) else: return UDP(sport=ports, dport=ports) elif p == 'TCP': if ports == 0: return TCP(sport=random.randint(self.tcp_sport_from, self.tcp_sport_to), dport=random.randint(self.tcp_dport_from, self.tcp_dport_to)) else: return TCP(sport=ports, dport=ports) return '' def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, proto=-1, ports=0): """ Create input packet stream for defined interface using hosts or deleted_hosts list. :param object src_if: Interface to create packet stream for. :param list packet_sizes: List of required packet sizes. :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise. :return: Stream of packets. """ pkts = [] if self.flows.__contains__(src_if): src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index] for dst_if in self.flows[src_if]: dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index] n_int = len(dst_hosts) * len(src_hosts) for i in range(0, n_int): dst_host = dst_hosts[i / len(src_hosts)] src_host = src_hosts[i % len(src_hosts)] pkt_info = self.create_packet_info(src_if, dst_if) if ipv6 == 1: pkt_info.ip = 1 elif ipv6 == 0: pkt_info.ip = 0 else: pkt_info.ip = random.choice([0, 1]) if proto == -1: pkt_info.proto = random.choice(self.proto[self.IP]) else: pkt_info.proto = proto payload = self.info_to_payload(pkt_info) p = Ether(dst=dst_host.mac, src=src_host.mac) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) else: p /= IP(src=src_host.ip4, dst=dst_host.ip4) if traffic_type == self.ICMP: if pkt_info.ip: p /= ICMPv6EchoRequest(type=self.icmp6_type, code=self.icmp6_code) else: p /= ICMP(type=self.icmp4_type, code=self.icmp4_code) else: p /= self.create_upper_layer(i, pkt_info.proto, ports) p /= Raw(payload) pkt_info.data = p.copy() size = random.choice(packet_sizes) self.extend_packet(p, size) pkts.append(p) return pkts def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0): """ Verify captured input packet stream for defined interface. :param object pg_if: Interface to verify captured packet stream for. :param list capture: Captured packet stream. :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise. """ last_info = dict() for i in self.pg_interfaces: last_info[i.sw_if_index] = None dst_sw_if_index = pg_if.sw_if_index for packet in capture: try: # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data if traffic_type == self.ICMP and ip_type == self.IPV6: payload_info = self.payload_to_info( packet[ICMPv6EchoRequest].data) payload = packet[ICMPv6EchoRequest] else: payload_info = self.payload_to_info(str(packet[Raw])) payload = packet[self.proto_map[payload_info.proto]] except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise if ip_type != 0: self.assertEqual(payload_info.ip, ip_type) if traffic_type == self.ICMP: try: if payload_info.ip == 0: self.assertEqual(payload.type, self.icmp4_type) self.assertEqual(payload.code, self.icmp4_code) else: self.assertEqual(payload.type, self.icmp6_type) self.assertEqual(payload.code, self.icmp6_code) except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise else: try: ip_version = IPv6 if payload_info.ip == 1 else IP ip = packet[ip_version] packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug("Got packet on port %s: src=%u (id=%u)" % (pg_if.name, payload_info.src, packet_index)) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) saved_packet = next_info.data # Check standard fields self.assertEqual(ip.src, saved_packet[ip_version].src) self.assertEqual(ip.dst, saved_packet[ip_version].dst) p = self.proto_map[payload_info.proto] if p == 'TCP': tcp = packet[TCP] self.assertEqual(tcp.sport, saved_packet[ TCP].sport) self.assertEqual(tcp.dport, saved_packet[ TCP].dport) elif p == 'UDP': udp = packet[UDP] self.assertEqual(udp.sport, saved_packet[ UDP].sport) self.assertEqual(udp.dport, saved_packet[ UDP].dport) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for i in self.pg_interfaces: remaining_packet = self.get_next_packet_info_for_interface2( i, dst_sw_if_index, last_info[i.sw_if_index]) self.assertTrue( remaining_packet is None, "Port %u: Packet expected from source %u didn't arrive" % (dst_sw_if_index, i.sw_if_index)) def run_traffic_no_check(self): # Test # Create incoming packet streams for packet-generator interfaces for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes) if len(pkts) > 0: i.add_stream(pkts) # Enable packet capture and start packet sending self.pg_enable_capture(self.pg_interfaces) self.pg_start() def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) # Enable packet capture and start packet sendingself.IPV self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Verify # Verify outgoing packet streams per packet-generator interface for src_if in self.pg_interfaces: if self.flows.__contains__(src_if): for dst_if in self.flows[src_if]: capture = dst_if.get_capture(pkts_cnt) self.logger.info("Verifying capture on interface %s" % dst_if.name) self.verify_capture(dst_if, capture, traffic_type, ip_type) def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0): # Test self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports) if len(pkts) > 0: i.add_stream(pkts) # Enable packet capture and start packet sending self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Verify # Verify outgoing packet streams per packet-generator interface for src_if in self.pg_interfaces: if self.flows.__contains__(src_if): for dst_if in self.flows[src_if]: self.logger.info("Verifying capture on interface %s" % dst_if.name) capture = dst_if.get_capture(0) self.assertEqual(len(capture), 0) def api_acl_add_replace(self, acl_index, r, count, tag='', expected_retval=0): """Add/replace an ACL :param int acl_index: ACL index to replace, 4294967295 to create new ACL. :param acl_rule r: ACL rules array. :param str tag: symbolic tag (description) for this ACL. :param int count: number of rules. """ return self.vapi.api(self.vapi.papi.acl_add_replace, {'acl_index': acl_index, 'r': r, 'count': count, 'tag': tag}, expected_retval=expected_retval) def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls, expected_retval=0): return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list, {'sw_if_index': sw_if_index, 'count': count, 'n_input': n_input, 'acls': acls}, expected_retval=expected_retval) def api_acl_dump(self, acl_index, expected_retval=0): return self.vapi.api(self.vapi.papi.acl_dump, {'acl_index': acl_index}, expected_retval=expected_retval) def test_0000_warmup_test(self): """ ACL plugin version check; learn MACs """ self.create_hosts(16) self.run_traffic_no_check() reply = self.vapi.papi.acl_plugin_get_version() self.assertEqual(reply.major, 1) self.logger.info("Working with ACL plugin version: %d.%d" % ( reply.major, reply.minor)) # minor version changes are non breaking # self.assertEqual(reply.minor, 0) def test_0001_acl_create(self): """ ACL create test """ self.logger.info("ACLP_TEST_START_0001") # Add an ACL r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17, 'srcport_or_icmptype_first': 1234, 'srcport_or_icmptype_last': 1235, 'src_ip_prefix_len': 0, 'src_ip_addr': '\x00\x00\x00\x00', 'dstport_or_icmpcode_first': 1234, 'dstport_or_icmpcode_last': 1234, 'dst_ip_addr': '\x00\x00\x00\x00', 'dst_ip_prefix_len': 0}] # Test 1: add a new ACL reply = self.api_acl_add_replace(acl_index=4294967295, r=r, count=len(r), tag="permit 1234") self.assertEqual(reply.retval, 0) # The very first ACL gets #0 self.assertEqual(reply.acl_index, 0) rr = self.api_acl_dump(reply.acl_index) self.logger.info("Dumped ACL: " + str(rr)) self.assertEqual(len(rr), 1) # We should have the same number of ACL entries as we had asked self.assertEqual(len(rr[0].r), len(r)) # The rules should be the same. But because the submitted and returned # are different types, we need to iterate over rules and keys to get # to basic values. for i_rule in range(0, len(r) - 1): for rule_key in r[i_rule]: self.assertEqual(rr[0].r[i_rule][rule_key], r[i_rule][rule_key]) # Add a deny-1234 ACL r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17, 'srcport_or_icmptype_first': 1234, 'srcport_or_icmptype_last': 1235, 'src_ip_prefix_len': 0, 'src_ip_addr': '\x00\x00\x00\x00', 'dstport_or_icmpcode_first': 1234, 'dstport_or_icmpcode_last': 1234, 'dst_ip_addr': '\x00\x00\x00\x00', 'dst_ip_prefix_len': 0}, {'is_permit': 1, 'is_ipv6': 0, 'proto': 17, 'srcport_or_icmptype_first': 0, 'srcport_or_icmptype_last': 0, 'src_ip_prefix_len': 0, 'src_ip_addr': '\x00\x00\x00\x00', 'dstport_or_icmpcode_first': 0, 'dstport_or_icmpcode_last': 0, 'dst_ip_addr': '\x00\x00\x00\x00', 'dst_ip_prefix_len': 0}) reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny, count=len(r_deny), tag="deny 1234;permit all") self.assertEqual(reply.retval, 0) # The second ACL gets #1 self.assertEqual(reply.acl_index, 1) # Test 2: try to modify a nonexistent ACL reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r), tag="FFFF:FFFF", expected_retval=-1) self.assertEqual(reply.retval, -1) # The ACL number should pass through self.assertEqual(reply.acl_index, 432) self.logger.info("ACLP_TEST_FINISH_0001") def test_0002_acl_permit_apply(self): """ permit ACL apply test """ self.logger.info("ACLP_TEST_START_0002") rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP])) # Apply rules self.apply_rules(rules, "permit per-flow") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, -1) self.logger.info("ACLP_TEST_FINISH_0002") def test_0003_acl_deny_apply(self): """ deny ACL apply test """ self.logger.info("ACLP_TEST_START_0003") # Add a deny-flows ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP])) # Permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny per-flow;permit all") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) self.logger.info("ACLP_TEST_FINISH_0003") # self.assertEqual(1, 0) def test_0004_vpp624_permit_icmpv4(self): """ VPP_624 permit ICMPv4 """ self.logger.info("ACLP_TEST_START_0004") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.ICMP][self.ICMPv4])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit icmpv4") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4]) self.logger.info("ACLP_TEST_FINISH_0004") def test_0005_vpp624_permit_icmpv6(self): """ VPP_624 permit ICMPv6 """ self.logger.info("ACLP_TEST_START_0005") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.ICMP][self.ICMPv6])) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit icmpv6") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6]) self.logger.info("ACLP_TEST_FINISH_0005") def test_0006_vpp624_deny_icmpv4(self): """ VPP_624 deny ICMPv4 """ self.logger.info("ACLP_TEST_START_0006") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.ICMP][self.ICMPv4])) # permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny icmpv4") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV4, 0) self.logger.info("ACLP_TEST_FINISH_0006") def test_0007_vpp624_deny_icmpv6(self): """ VPP_624 deny ICMPv6 """ self.logger.info("ACLP_TEST_START_0007") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.ICMP][self.ICMPv6])) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny icmpv6") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV6, 0) self.logger.info("ACLP_TEST_FINISH_0007") def test_0008_tcp_permit_v4(self): """ permit TCPv4 """ self.logger.info("ACLP_TEST_START_0008") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ipv4 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) self.logger.info("ACLP_TEST_FINISH_0008") def test_0009_tcp_permit_v6(self): """ permit TCPv6 """ self.logger.info("ACLP_TEST_START_0009") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ip6 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) self.logger.info("ACLP_TEST_FINISH_0008") def test_0010_udp_permit_v4(self): """ permit UDPv4 """ self.logger.info("ACLP_TEST_START_0010") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ipv udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) self.logger.info("ACLP_TEST_FINISH_0010") def test_0011_udp_permit_v6(self): """ permit UDPv6 """ self.logger.info("ACLP_TEST_START_0011") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ip6 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) self.logger.info("ACLP_TEST_FINISH_0011") def test_0012_tcp_deny(self): """ deny TCPv4/v6 """ self.logger.info("ACLP_TEST_START_0012") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP])) # permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 tcp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]) self.logger.info("ACLP_TEST_FINISH_0012") def test_0013_udp_deny(self): """ deny UDPv4/v6 """ self.logger.info("ACLP_TEST_START_0013") # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP])) rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP])) # permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 udp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]) self.logger.info("ACLP_TEST_FINISH_0013") def test_0014_acl_dump(self): """ verify add/dump acls """ self.logger.info("ACLP_TEST_START_0014") r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]], [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]], [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]], [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]], [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]], [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]], [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]], [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]], [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]], [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]], [self.IPV4, self.DENY, self.PORTS_ALL, 0], [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]], [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]], [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]], [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]], [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]], [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]], [self.IPV6, self.DENY, self.PORTS_ALL, 0] ] # Add and verify new ACLs rules = [] for i in range(len(r)): rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3])) reply = self.api_acl_add_replace(acl_index=4294967295, r=rules, count=len(rules)) result = self.api_acl_dump(reply.acl_index) i = 0 for drules in result: for dr in drules.r: self.assertEqual(dr.is_ipv6, r[i][0]) self.assertEqual(dr.is_permit, r[i][1]) self.assertEqual(dr.proto, r[i][3]) if r[i][2] > 0: self.assertEqual(dr.srcport_or_icmptype_first, r[i][2]) else: if r[i][2] < 0: self.assertEqual(dr.srcport_or_icmptype_first, 0) self.assertEqual(dr.srcport_or_icmptype_last, 65535) else: if dr.proto == self.proto[self.IP][self.TCP]: self.assertGreater(dr.srcport_or_icmptype_first, self.tcp_sport_from-1) self.assertLess(dr.srcport_or_icmptype_first, self.tcp_sport_to+1) self.assertGreater(dr.dstport_or_icmpcode_last, self.tcp_dport_from-1) self.assertLess(dr.dstport_or_icmpcode_last, self.tcp_dport_to+1) elif dr.proto == self.proto[self.IP][self.UDP]: self.assertGreater(dr.srcport_or_icmptype_first, self.udp_sport_from-1) self.assertLess(dr.srcport_or_icmptype_first, self.udp_sport_to+1) self.assertGreater(dr.dstport_or_icmpcode_last, self.udp_dport_from-1) self.assertLess(dr.dstport_or_icmpcode_last, self.udp_dport_to+1) i += 1 self.logger.info("ACLP_TEST_FINISH_0014") def test_0015_tcp_permit_port_v4(self): """ permit single TCPv4 """ self.logger.info("ACLP_TEST_START_0015") port = random.randint(0, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ip4 tcp "+str(port)) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port) self.logger.info("ACLP_TEST_FINISH_0015") def test_0016_udp_permit_port_v4(self): """ permit single UDPv4 """ self.logger.info("ACLP_TEST_START_0016") port = random.randint(0, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ip4 tcp "+str(port)) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port) self.logger.info("ACLP_TEST_FINISH_0016") def test_0017_tcp_permit_port_v6(self): """ permit single TCPv6 """ self.logger.info("ACLP_TEST_START_0017") port = random.randint(0, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ip4 tcp "+str(port)) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port) self.logger.info("ACLP_TEST_FINISH_0017") def test_0018_udp_permit_port_v6(self): """ permit single UPPv6 """ self.logger.info("ACLP_TEST_START_0018") port = random.randint(0, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ip4 tcp "+str(port)) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port) self.logger.info("ACLP_TEST_FINISH_0018") def test_0019_udp_deny_port(self): """ deny single TCPv4/v6 """ self.logger.info("ACLP_TEST_START_0019") port = random.randint(0, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP])) # Permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port) self.logger.info("ACLP_TEST_FINISH_0019") def test_0020_udp_deny_port(self): """ deny single UDPv4/v6 """ self.logger.info("ACLP_TEST_START_0020") port = random.randint(0, 65535) # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])) rules.append(self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])) # Permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port) self.logger.info("ACLP_TEST_FINISH_0020") if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)