aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_snat.py
blob: df81fb56ab58922ddc4d613c0d7c7acda7c83a5a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995

@media only all and (prefers-color-scheme: dark) {
.highlight .hll { background-color: #49483e }
.highlight .c { color: #75715e } /* Comment */
.highlight .err { color: #960050; background-color: #1e0010 } /* Error */
.highlight .k { color: #66d9ef } /* Keyword */
.highlight .l { color: #ae81ff } /* Literal */
.highlight .n { color: #f8f8f2 } /* Name */
.highlight .o { color: #f92672 } /* Operator */
.highlight .p { color: #f8f8f2 } /* Punctuation */
.highlight .ch { color: #75715e } /* Comment.Hashbang */
.highlight .cm { color: #75715e } /* Comment.Multiline */
.highlight .cp { color: #75715e } /* Comment.Preproc */
.highlight .cpf { color: #75715e } /* Comment.PreprocFile */
.highlight .c1 { color: #75715e } /* Comment.Single */
.highlight .cs { color: #75715e } /* Comment.Special */
.highlight .gd { color: #f92672 } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gi { color: #a6e22e } /* Generic.Inserted */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #75715e } /* Generic.Subheading */
.highlight .kc { color: #66d9ef } /* Keyword.Constant */
.highlight .kd { color: #66d9ef } /* Keyword.Declaration */
.highlight .kn { color: #f92672 } /* Keyword.Namespace */
.highlight .kp { color: #66d9ef } /* Keyword.Pseudo */
.highlight .kr { color: #66d9ef } /* Keyword.Reserved */
.highlight .kt { color: #66d9ef } /* Keyword.Type */
.highlight .ld { color: #e6db74 } /* Literal.Date */
.highlight .m { color: #ae81ff } /* Literal.Number */
.highlight .s { color: #e6db74 } /* Literal.String */
.highlight .na { color: #a6e22e } /* Name.Attribute */
.highlight .nb { color: #f8f8f2 } /* Name.Builtin */
.highlight .nc { color: #a6e22e } /* Name.Class */
.highlight .no { color: #66d9ef } /* Name.Constant */
.highlight .nd { color: #a6e22e } /* Name.Decorator */
.highlight .ni { color: #f8f8f2 } /* Name.Entity */
.highlight .ne { color: #a6e22e } /* Name.Exception */
.highlight .nf { color: #a6e22e } /* Name.Function */
.highlight .nl { color: #f8f8f2 } /* Name.Label */
.highlight .nn { color: #f8f8f2 } /* Name.Namespace */
.highlight .nx { color: #a6e22e } /* Name.Other */
.highlight .py { color: #f8f8f2 } /* Name.Property */
.highlight .nt { color: #f92672 } /* Name.Tag */
.highlight .nv { color: #f8f8f2 } /* Name.Variable */
.highlight .ow { color: #f92672 } /* Operator.Word */
.highlight .w { color: #f8f8f2 } /* Text.Whitespace */
.highlight .mb { color: #ae81ff } /* Literal.Number.Bin */
.highlight .mf { color: #ae81ff } /* Literal.Number.Float */
.highlight .mh { color: #ae81ff } /* Literal.Number.Hex */
.highlight .mi { color: #ae81ff } /* Literal.Number.Integer */
.highlight .mo { color: #ae81ff } /* Literal.Number.Oct */
.highlight .sa { color: #e6db74 } /* Literal.String.Affix */
.highlight .sb { color: #e6db74 } /* Literal.String.Backtick */
.highlight .sc { color: #e6db74 } /* Literal.String.Char */
.highlight .dl { color: #e6db74 } /* Literal.String.Delimiter */
.highlight .sd { color: #e6db74 } /* Literal.String.Doc */
.highlight .s2 { color: #e6db74 } /* Literal.String.Double */
.highlight .se { color: #ae81ff } /* Literal.String.Escape */
.highlight .sh { color: #e6db74 } /* Literal.String.Heredoc */
.highlight .si { color: #e6db74 } /* Literal.String.Interpol */
.highlight .sx { color: #e6db74 } /* Literal.String.Other */
.highlight .sr { color: #e6db74 } /* Literal.String.Regex */
.highlight .s1 { color: #e6db74 } /* Literal.String.Single */
.highlight .ss { color: #e6db74 } /* Literal.String.Symbol */
.highlight .bp { color: #f8f8f2 } /* Name.Builtin.Pseudo */
.highlight .fm { color: #a6e22e } /* Name.Function.Magic */
.highlight .vc { color: #f8f8f2 } /* Name.Variable.Class */
.highlight .vg { color: #f8f8f2 } /* Name.Variable.Global */
.highlight .vi { color: #f8f8f2 } /* Name.Variable.Instance */
.highlight .vm { color: #f8f8f2 } /* Name.Variable.Magic */
.highlight .il { color: #ae81ff } /* Literal.Number.Integer.Long */
}
@media (prefers-color-scheme: light) {
.highlight .hll { background-color: #ffffcc }
.highlight .c { color: #888888 } /* Comment */
.highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */
.highlight .k { color: #008800; font-weight: bold } /* Keyword */
.highlight .ch { color: #888888 } /* Comment.Hashbang */
.highlight .cm { color: #888888 } /* Comment.Multiline */
.highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */
.highlight .cpf { color: #888888 } /* Comment.PreprocFile */
.highlight .c1 { color: #888888 } /* Comment.Single */
.highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */
.highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gr { color: #aa0000 } /* Generic.Error */
.highlight .gh { color: #333333 } /* Generic.Heading */
.highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */
.highlight .go { color: #888888 } /* Generic.Output */
.highlight .gp { color: #555555 } /* Generic.Prompt */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #666666 } /* Generic.Subheading */
.highlight .gt { color: #aa0000 } /* Generic.Traceback */
.highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */
.highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */
.highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */
.highlight .kp { color: #008800 } /* Keyword.Pseudo */
.highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */
.highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */
.highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */
.highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */
.highlight .na { color: #336699 } /* Name.Attribute */
.highlight .nb { color: #003388 } /* Name.Builtin */
.highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */
.highlight .no { color: #003366; font-weight: bold } /* Name.Constant */
.highlight .nd { color: #555555 } /* Name.Decorator */
.highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */
.highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */
.highlight .nl { color: #336699; font-style: italic } /* Name.Label */
.highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */
.highlight .py { color: #336699; font-weight: bold } /* Name.Property */
.highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */
.highlight .nv { color: #336699 } /* Name.Variable */
.highlight .ow { color: #008800 } /* Operator.Word */
.highlight .w { color: #bbbbbb } /* Text.Whitespace */
.highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */
.highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */
.highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */
.highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */
.highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */
.highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */
.highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */
.highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */
.highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */
.highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */
.highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */
.highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */
.highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */
.highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */
.highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */
.highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */
.highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */
.highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */
.highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */
.highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */
.highlight .vc { color: #336699 } /* Name.Variable.Class */
.highlight .vg { color: #dd7700 } /* Name.Variable.Global */
.highlight .vi { color: #3333bb } /* Name.Variable.Instance */
.highlight .vm { color: #336699 } /* Name.Variable.Magic */
.highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */
}
/*
 * Copyright (c) 2016 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef __FIB_TABLE_H__
#define __FIB_TABLE_H__

#include <vnet/ip/ip.h>
#include <vnet/adj/adj.h>
#include <vnet/fib/fib_entry.h>
#include <vnet/mpls/mpls.h>
#include <vnet/mpls/packet.h>

/**
 * Flags for the source data
 */
typedef enum fib_table_attribute_t_ {
    /**
     * Marker. Add new values after this one.
     */
    FIB_TABLE_ATTRIBUTE_FIRST,
    /**
     * the table is for IP6 link local addresses
     */
    FIB_TABLE_ATTRIBUTE_IP6_LL = FIB_TABLE_ATTRIBUTE_FIRST,
    /**
     * the table is currently resync-ing
     */
    FIB_TABLE_ATTRIBUTE_RESYNC,
    /**
     * Marker. add new entries before this one.
     */
    FIB_TABLE_ATTRIBUTE_LAST = FIB_TABLE_ATTRIBUTE_RESYNC,
} fib_table_attribute_t;

#define FIB_TABLE_ATTRIBUTE_MAX (FIB_TABLE_ATTRIBUTE_LAST+1)

#define FIB_TABLE_ATTRIBUTES {		         \
    [FIB_TABLE_ATTRIBUTE_IP6_LL]  = "ip6-ll",	 \
    [FIB_TABLE_ATTRIBUTE_RESYNC]  = "resync",    \
}

#define FOR_EACH_FIB_TABLE_ATTRIBUTE(_item)      	\
    for (_item = FIB_TABLE_ATTRIBUTE_FIRST;		\
	 _item < FIB_TABLE_ATTRIBUTE_MAX;		\
	 _item++)

typedef enum fib_table_flags_t_ {
    FIB_TABLE_FLAG_NONE   = 0,
    FIB_TABLE_FLAG_IP6_LL  = (1 << FIB_TABLE_ATTRIBUTE_IP6_LL),
    FIB_TABLE_FLAG_RESYNC  = (1 << FIB_TABLE_ATTRIBUTE_RESYNC),
} __attribute__ ((packed)) fib_table_flags_t;

extern u8* format_fib_table_flags(u8 *s, va_list *args);

/**
 * @brief 
 *   A protocol Independent FIB table
 */
typedef struct fib_table_t_
{
    /**
     * Which protocol this table serves. Used to switch on the union above.
     */
    fib_protocol_t ft_proto;

    /**
     * Table flags
     */
    fib_table_flags_t ft_flags;

    /**
     * per-source number of locks on the table
     */
    u32 *ft_locks;
    u32 ft_total_locks;

    /**
     * Table ID (hash key) for this FIB.
     */
    u32 ft_table_id;

    /**
     * Index into FIB vector.
     */
    fib_node_index_t ft_index;

    /**
     * flow hash configuration
     */
    u32 ft_flow_hash_config;

    /**
     * Per-source route counters
     */
    u32 *ft_src_route_counts;

    /**
     * Total route counters
     */
    u32 ft_total_route_counts;

    /**
     * Epoch - number of resyncs performed
     */
    u32 ft_epoch;

    /**
     * Table description
     */
    u8* ft_desc;
} fib_table_t;


/**
 * @brief
 *  Default names for IP4, IP6, and MPLS FIB table index 0.
 *  Nominally like "ipv4-VRF:0", but this will override that name if set
 *  in a config section of the startup.conf file.
 */
extern char *fib_table_default_names[FIB_PROTOCOL_MAX];

/**
 * @brief
 *  Format the description/name of the table
 */
extern u8* format_fib_table_name(u8* s, va_list *ap);

/**
 * @brief
 *  Perfom a longest prefix match in the non-forwarding table
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix to lookup
 *
 * @return
 *  The index of the fib_entry_t for the best match, which may be the default route
 */
extern fib_node_index_t fib_table_lookup(u32 fib_index,
					 const fib_prefix_t *prefix);

/**
 * @brief
 *  Perfom an exact match in the non-forwarding table
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix to lookup
 *
 * @return
 *  The index of the fib_entry_t for the exact match, or INVALID
 *  is there is no match.
 */
extern fib_node_index_t fib_table_lookup_exact_match(u32 fib_index,
						     const fib_prefix_t *prefix);

/**
 * @brief
 *  Get the less specific (covering) prefix
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix to lookup
 *
 * @return
 *  The index of the less specific fib_entry_t.
 */
extern fib_node_index_t fib_table_get_less_specific(u32 fib_index,
						    const fib_prefix_t *prefix);

/**
 * @brief
 *  Add a 'special' entry to the FIB.
 *  A special entry is an entry that the FIB is not expect to resolve
 *  via the usual mechanisms (i.e. recurisve or neighbour adj DB lookup).
 *  Instead the will link to a DPO valid for the source and/or the flags.
 *  This add is reference counting per-source. So n 'removes' are required
 *  for n 'adds', if the entry is no longer required.
 *  If the source needs to provide non-default forwarding use:
 *  fib_table_entry_special_dpo_add()
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix to add
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 * @param flags
 *  Flags for the entry.
 *
 * @return
 *  the index of the fib_entry_t that is created (or exists already).
 */
extern fib_node_index_t fib_table_entry_special_add(u32 fib_index,
						    const fib_prefix_t *prefix,
						    fib_source_t source,
						    fib_entry_flag_t flags);

/**
 * @brief
 *  Add a 'special' entry to the FIB that links to the DPO passed
 *  A special entry is an entry that the FIB is not expect to resolve
 *  via the usual mechanisms (i.e. recurisve or neighbour adj DB lookup).
 *  Instead the client/source provides the DPO to link to.
 *  This add is reference counting per-source. So n 'removes' are required
 *  for n 'adds', if the entry is no longer required.
 *
  * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix to add
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 * @param flags
 *  Flags for the entry.
 *
 * @param dpo
 *  The DPO to link to.
 *
 * @return
 *  the index of the fib_entry_t that is created (or existed already).
 */
extern fib_node_index_t fib_table_entry_special_dpo_add(u32 fib_index,
                                                        const fib_prefix_t *prefix,
                                                        fib_source_t source,
                                                        fib_entry_flag_t stype,
                                                        const dpo_id_t *dpo);

/**
 * @brief
 *  Update a 'special' entry to the FIB that links to the DPO passed
 *  A special entry is an entry that the FIB is not expect to resolve
 *  via the usual mechanisms (i.e. recurisve or neighbour adj DB lookup).
 *  Instead the client/source provides the DPO to link to.
 *  Special entries are add/remove reference counted per-source. So n
 * 'removes' are required for n 'adds', if the entry is no longer required.
 *  An 'update' is an 'add' if no 'add' has already been called, otherwise an 'add'
 * is therefore assumed to act on the reference instance of that add.
 *
 * @param fib_entry_index
 *  The index of the FIB entry to update
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 * @param flags
 *  Flags for the entry.
 *
 * @param dpo
 *  The DPO to link to.
 *
 * @return
 *  the index of the fib_entry_t that is created (or existed already).
 */
extern fib_node_index_t fib_table_entry_special_dpo_update (u32 fib_index,
							    const fib_prefix_t *prefix,
							    fib_source_t source,
							    fib_entry_flag_t stype,
							    const dpo_id_t *dpo);

/**
 * @brief
 *  Remove a 'special' entry from the FIB.
 *  This add is reference counting per-source. So n 'removes' are required
 *  for n 'adds', if the entry is no longer required.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix to remove
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 */
extern void fib_table_entry_special_remove(u32 fib_index,
					   const fib_prefix_t *prefix,
					   fib_source_t source);

/**
 * @brief
 *  Add one path to an entry (aka route) in the FIB. If the entry does not
 *  exist, it will be created.
 * See the documentation for fib_route_path_t for more descirptions of
 * the path parameters.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix for the entry to add
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 * @param flags
 *  Flags for the entry.
 *
 * @paran next_hop_proto
 *  The protocol of the next hop. This cannot be derived in the event that
 * the next hop is all zeros.
 *
 * @param next_hop
 *  The address of the next-hop.
 *
 * @param sw_if_index
 *  The index of the interface.
 *
 * @param next_hop_fib_index,
 *  The fib index of the next-hop for recursive resolution
 *
 * @param next_hop_weight
 *  [un]equal cost path weight
 *
 * @param  next_hop_label_stack
 *  The path's out-going label stack. NULL is there is none.
 *
 * @param  pf
 *  Flags for the path
 *
 * @return
 *  the index of the fib_entry_t that is created (or existed already).
 */
extern fib_node_index_t fib_table_entry_path_add(u32 fib_index,
						 const fib_prefix_t *prefix,
						 fib_source_t source,
						 fib_entry_flag_t flags,
						 dpo_proto_t next_hop_proto,
						 const ip46_address_t *next_hop,
						 u32 next_hop_sw_if_index,
						 u32 next_hop_fib_index,
						 u32 next_hop_weight,
						 fib_mpls_label_t *next_hop_label_stack,
						 fib_route_path_flags_t pf);
/**
 * @brief
 *  Add n paths to an entry (aka route) in the FIB. If the entry does not
 *  exist, it will be created.
 * See the documentation for fib_route_path_t for more descirptions of
 * the path parameters.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix for the entry to add
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 * @param flags
 *  Flags for the entry.
 *
 * @param rpaths
 *  A vector of paths. Not const since they may be modified.
 *
 * @return
 *  the index of the fib_entry_t that is created (or existed already).
 */
extern fib_node_index_t fib_table_entry_path_add2(u32 fib_index,
						  const fib_prefix_t *prefix,
						  fib_source_t source,
						  fib_entry_flag_t flags,
						  fib_route_path_t *rpath);

/**
 * @brief
 * remove one path to an entry (aka route) in the FIB. If this is the entry's
 * last path, then the entry will be removed, unless it has other sources.
 * See the documentation for fib_route_path_t for more descirptions of
 * the path parameters.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix for the entry to add
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 * @paran next_hop_proto
 *  The protocol of the next hop. This cannot be derived in the event that
 * the next hop is all zeros.
 *
 * @param next_hop
 *  The address of the next-hop.
 *
 * @param sw_if_index
 *  The index of the interface.
 *
 * @param next_hop_fib_index,
 *  The fib index of the next-hop for recursive resolution
 *
 * @param next_hop_weight
 *  [un]equal cost path weight
 *
 * @param  pf
 *  Flags for the path
 */
extern void fib_table_entry_path_remove(u32 fib_index,
					const fib_prefix_t *prefix,
					fib_source_t source,
					dpo_proto_t next_hop_proto,
					const ip46_address_t *next_hop,
					u32 next_hop_sw_if_index,
					u32 next_hop_fib_index,
					u32 next_hop_weight,
					fib_route_path_flags_t pf);

/**
 * @brief
 * Remove n paths to an entry (aka route) in the FIB. If this is the entry's
 * last path, then the entry will be removed, unless it has other sources.
 * See the documentation for fib_route_path_t for more descirptions of
 * the path parameters.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix for the entry to add
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 * @param rpaths
 *  A vector of paths.
 */
extern void fib_table_entry_path_remove2(u32 fib_index,
					 const fib_prefix_t *prefix,
					 fib_source_t source,
					 fib_route_path_t *paths);

/**
 * @brief
 *  Update an entry to have a new set of paths. If the entry does not
 *  exist, it will be created.
 * The difference between an 'path-add' and an update, is that path-add is
 * an incremental addition of paths, whereas an update is a wholesale swap.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix for the entry to add
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 * @param rpaths
 *  A vector of paths. Not const since they may be modified.
 *
 * @return
 *  the index of the fib_entry_t that is created (or existed already).
 */
extern fib_node_index_t fib_table_entry_update(u32 fib_index,
					       const fib_prefix_t *prefix,
					       fib_source_t source,
					       fib_entry_flag_t flags,
					       fib_route_path_t *paths);

/**
 * @brief
 *  Update the entry to have just one path. If the entry does not
 *  exist, it will be created.
 * See the documentation for fib_route_path_t for more descirptions of
 * the path parameters.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix for the entry to add
 *
 * @param source
 *  The ID of the client/source adding the entry.
 *
 * @param flags
 *  Flags for the entry.
 *
 * @paran next_hop_proto
 *  The protocol of the next hop. This cannot be derived in the event that
 * the next hop is all zeros.
 *
 * @param next_hop
 *  The address of the next-hop.
 *
 * @param sw_if_index
 *  The index of the interface.
 *
 * @param next_hop_fib_index,
 *  The fib index of the next-hop for recursive resolution
 *
 * @param next_hop_weight
 *  [un]equal cost path weight
 *
 * @param  next_hop_label_stack
 *  The path's out-going label stack. NULL is there is none.
 *
 * @param  pf
 *  Flags for the path
 *
 * @return
 *  the index of the fib_entry_t that is created (or existed already).
 */
extern fib_node_index_t fib_table_entry_update_one_path(u32 fib_index,
							const fib_prefix_t *prefix,
							fib_source_t source,
							fib_entry_flag_t flags,
							dpo_proto_t next_hop_proto,
							const ip46_address_t *next_hop,
							u32 next_hop_sw_if_index,
							u32 next_hop_fib_index,
							u32 next_hop_weight,
							fib_mpls_label_t *next_hop_label_stack,
							fib_route_path_flags_t pf);

/**
 * @brief
 *  Add a MPLS local label for the prefix/route. If the entry does not
 *  exist, it will be created. In theory more than one local label can be
 *  added, but this is not yet supported.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix for the entry to which to add the label
 *
 * @param label
 *  The MPLS label to add
 *
 * @return
 *  the index of the fib_entry_t that is created (or existed already).
 */
extern fib_node_index_t fib_table_entry_local_label_add(u32 fib_index,
							const fib_prefix_t *prefix,
							mpls_label_t label);
/**
 * @brief
 *  remove a MPLS local label for the prefix/route.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix for the entry to which to add the label
 *
 * @param label
 *  The MPLS label to add
 */
extern void fib_table_entry_local_label_remove(u32 fib_index,
					       const fib_prefix_t *prefix,
					       mpls_label_t label);

/**
 * @brief
 *  Delete a FIB entry. If the entry has no more sources, then it is
 * removed from the table.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @param prefix
 *  The prefix for the entry to remove
 *
 * @param source
 *  The ID of the client/source adding the entry.
 */
extern void fib_table_entry_delete(u32 fib_index,
				   const fib_prefix_t *prefix,
				   fib_source_t source);

/**
 * @brief
 *  Delete a FIB entry. If the entry has no more sources, then it is
 * removed from the table.
 *
 * @param entry_index
 *  The index of the FIB entry
 *
 * @param source
 *  The ID of the client/source adding the entry.
 */
extern void fib_table_entry_delete_index(fib_node_index_t entry_index,
					 fib_source_t source);

/**
 * @brief
 *  Return the stats index for a FIB entry
 * @param fib_index
 *  The table's FIB index
 * @param prefix
 *  The entry's prefix's
 */
extern u32 fib_table_entry_get_stats_index(u32 fib_index,
                                           const fib_prefix_t *prefix);

/**
 * @brief
 *  Flush all entries from a table for the source
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @paran proto
 *  The protocol of the entries in the table
 *
 * @param source
 *  the source to flush
 */
extern void fib_table_flush(u32 fib_index,
			    fib_protocol_t proto,
			    fib_source_t source);

/**
 * @brief
 *  Resync all entries from a table for the source
 *  this is the mark part of the mark and sweep algorithm.
 *  All entries in this FIB that are sourced by 'source' are marked
 *  as stale.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @paran proto
 *  The protocol of the entries in the table
 *
 * @param source
 *  the source to flush
 */
extern void fib_table_mark(u32 fib_index,
                           fib_protocol_t proto,
                           fib_source_t source);

/**
 * @brief
 *  Signal that the table has converged, i.e. all updates are complete.
 *  this is the sweep part of the mark and sweep algorithm.
 *  All entries in this FIB that are sourced by 'source' and marked
 *  as stale are flushed.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @paran proto
 *  The protocol of the entries in the table
 *
 * @param source
 *  the source to flush
 */
extern void fib_table_sweep(u32 fib_index,
                            fib_protocol_t proto,
                            fib_source_t source);

/**
 * @brief
 *  Get the index of the FIB bound to the interface
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @param sw_if_index
 *  The interface index
 *
 * @return fib_index
 *  The index of the FIB
 */
extern u32 fib_table_get_index_for_sw_if_index(fib_protocol_t proto,
					       u32 sw_if_index);

/**
 * @brief
 *  Get the Table-ID of the FIB bound to the interface
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @param sw_if_index
 *  The interface index
 *
 * @return fib_index
 *  The tableID of the FIB
 */
extern u32 fib_table_get_table_id_for_sw_if_index(fib_protocol_t proto,
						  u32 sw_if_index);

/**
 * @brief
 *  Get the Table-ID of the FIB from protocol and index
 *
 * @param fib_index
 *  The FIB index
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @return fib_index
 *  The tableID of the FIB
 */
extern u32 fib_table_get_table_id(u32 fib_index, fib_protocol_t proto);

/**
 * @brief
 *  Get the index of the FIB for a Table-ID. This DOES NOT create the
 * FIB if it does not exist.
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @param table-id
 *  The Table-ID
 *
 * @return fib_index
 *  The index of the FIB, which may be INVALID.
 */
extern u32 fib_table_find(fib_protocol_t proto, u32 table_id);


/**
 * @brief
 *  Get the index of the FIB for a Table-ID. This DOES create the
 * FIB if it does not exist.
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @param table-id
 *  The Table-ID
 *
 * @return fib_index
 *  The index of the FIB
 *
 * @param source
 *  The ID of the client/source.
 */
extern u32 fib_table_find_or_create_and_lock(fib_protocol_t proto,
					     u32 table_id,
                                             fib_source_t source);

/**
 * @brief
 *  Get the index of the FIB for a Table-ID. This DOES create the
 * FIB if it does not exist.
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @param table-id
 *  The Table-ID
 *
 * @return fib_index
 *  The index of the FIB
 *
 * @param source
 *  The ID of the client/source.
 *
 * @param name
 *  The client is choosing the name they want the table to have
 */
extern u32 fib_table_find_or_create_and_lock_w_name(fib_protocol_t proto,
                                                    u32 table_id,
                                                    fib_source_t source,
                                                    const u8 *name);

/**
 * @brief
 *  Create a new table with no table ID. This means it does not get
 * added to the hash-table and so can only be found by using the index returned.
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @param fmt
 *  A string to describe the table
 *
 * @param source
 *  The ID of the client/source.
 *
 * @return fib_index
 *  The index of the FIB
 */
extern u32 fib_table_create_and_lock(fib_protocol_t proto,
                                     fib_source_t source,
                                     const char *const fmt,
                                     ...);

/**
 * @brief
 *  Get the flow hash configured used by the table
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @paran proto
 *  The protocol the packets the flow hash will be calculated for.
 *
 * @return The flow hash config
 */
extern flow_hash_config_t fib_table_get_flow_hash_config(u32 fib_index,
							 fib_protocol_t proto);

/**
 * @brief
 *  Get the flow hash configured used by the protocol
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @return The flow hash config
 */
extern flow_hash_config_t fib_table_get_default_flow_hash_config(fib_protocol_t proto);

/**
 * @brief
 *  Set the flow hash configured used by the table
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @param hash_config
 *  The flow-hash config to set
 *
 * @return none
 */
extern void fib_table_set_flow_hash_config(u32 fib_index,
                                           fib_protocol_t proto,
                                           flow_hash_config_t hash_config);

/**
 * @brief
 * Take a reference counting lock on the table
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @param source
 *  The ID of the client/source.
 */ 
extern void fib_table_unlock(u32 fib_index,
			     fib_protocol_t proto,
                             fib_source_t source);

/**
 * @brief
 * Release a reference counting lock on the table. When the last lock
 * has gone. the FIB is deleted.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @param source
 *  The ID of the client/source.
 */ 
extern void fib_table_lock(u32 fib_index,
			   fib_protocol_t proto,
                           fib_source_t source);

/**
 * @brief
 * Return the number of entries in the FIB added by a given source.
 *
 * @param fib_index
 *  The index of the FIB
 *
 * @paran proto
 *  The protocol of the FIB (and thus the entries therein)
 *
 * @return number of sourced entries.
 */ 
extern u32 fib_table_get_num_entries(u32 fib_index,
				     fib_protocol_t proto,
				     fib_source_t source);

/**
 * @brief
 * Get a pointer to a FIB table
 */
extern fib_table_t *fib_table_get(fib_node_index_t index,
				  fib_protocol_t proto);

/**
 * @brief return code controlling how a table walk proceeds
 */
typedef enum fib_table_walk_rc_t_
{
    /**
     * Continue on to the next entry
     */
    FIB_TABLE_WALK_CONTINUE,
    /**
     * Do no traverse down this sub-tree
     */
    FIB_TABLE_WALK_SUB_TREE_STOP,
    /**
     * Stop the walk completely
     */
    FIB_TABLE_WALK_STOP,
} fib_table_walk_rc_t;

/**
 * @brief Call back function when walking entries in a FIB table
 */
typedef fib_table_walk_rc_t (*fib_table_walk_fn_t)(fib_node_index_t fei,
                                                   void *ctx);

/**
 * @brief Walk all entries in a FIB table
 * N.B: This is NOT safe to deletes. If you need to delete walk the whole
 * table and store elements in a vector, then delete the elements
 */
extern void fib_table_walk(u32 fib_index,
                           fib_protocol_t proto,
                           fib_table_walk_fn_t fn,
                           void *ctx);

/**
 * @brief Walk all entries in a FIB table
 * N.B: This is NOT safe to deletes. If you need to delete walk the whole
 * table and store elements in a vector, then delete the elements
 */
extern void fib_table_walk_w_src(u32 fib_index,
                                 fib_protocol_t proto,
                                 fib_source_t src,
                                 fib_table_walk_fn_t fn,
                                 void *ctx);

/**
 * @brief Walk all entries in a sub-tree FIB table. The 'root' paraneter
 * is the prefix at the root of the sub-tree.
 * N.B: This is NOT safe to deletes. If you need to delete walk the whole
 * table and store elements in a vector, then delete the elements
 */
extern void fib_table_sub_tree_walk(u32 fib_index,
                                    fib_protocol_t proto,
                                    const fib_prefix_t *root,
                                    fib_table_walk_fn_t fn,
                                    void *ctx);

/**
 * @brief format (display) the memory used by the FIB tables
 */
extern u8 *format_fib_table_memory(u8 *s, va_list *args);

/**
 * Debug function
 */
#if CLIB_DEBUG > 0
extern void fib_table_assert_empty(const fib_table_t *fib_table);
#endif


#endif
>2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565
#!/usr/bin/env python

import socket
import unittest
import struct

from framework import VppTestCase, VppTestRunner, running_extended_tests
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
from scapy.layers.l2 import Ether, ARP, GRE
from scapy.data import IP_PROTOS
from scapy.packet import bind_layers
from util import ppp
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from time import sleep


class MethodHolder(VppTestCase):
    """ SNAT create capture and verify method holder """

    @classmethod
    def setUpClass(cls):
        super(MethodHolder, cls).setUpClass()

    def tearDown(self):
        super(MethodHolder, self).tearDown()

    def check_ip_checksum(self, pkt):
        """
        Check IP checksum of the packet

        :param pkt: Packet to check IP checksum
        """
        new = pkt.__class__(str(pkt))
        del new['IP'].chksum
        new = new.__class__(str(new))
        self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)

    def check_tcp_checksum(self, pkt):
        """
        Check TCP checksum in IP packet

        :param pkt: Packet to check TCP checksum
        """
        new = pkt.__class__(str(pkt))
        del new['TCP'].chksum
        new = new.__class__(str(new))
        self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)

    def check_udp_checksum(self, pkt):
        """
        Check UDP checksum in IP packet

        :param pkt: Packet to check UDP checksum
        """
        new = pkt.__class__(str(pkt))
        del new['UDP'].chksum
        new = new.__class__(str(new))
        self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)

    def check_icmp_errror_embedded(self, pkt):
        """
        Check ICMP error embeded packet checksum

        :param pkt: Packet to check ICMP error embeded packet checksum
        """
        if pkt.haslayer(IPerror):
            new = pkt.__class__(str(pkt))
            del new['IPerror'].chksum
            new = new.__class__(str(new))
            self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)

        if pkt.haslayer(TCPerror):
            new = pkt.__class__(str(pkt))
            del new['TCPerror'].chksum
            new = new.__class__(str(new))
            self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)

        if pkt.haslayer(UDPerror):
            if pkt['UDPerror'].chksum != 0:
                new = pkt.__class__(str(pkt))
                del new['UDPerror'].chksum
                new = new.__class__(str(new))
                self.assertEqual(new['UDPerror'].chksum,
                                 pkt['UDPerror'].chksum)

        if pkt.haslayer(ICMPerror):
            del new['ICMPerror'].chksum
            new = new.__class__(str(new))
            self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)

    def check_icmp_checksum(self, pkt):
        """
        Check ICMP checksum in IPv4 packet

        :param pkt: Packet to check ICMP checksum
        """
        new = pkt.__class__(str(pkt))
        del new['ICMP'].chksum
        new = new.__class__(str(new))
        self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
        if pkt.haslayer(IPerror):
            self.check_icmp_errror_embedded(pkt)

    def check_icmpv6_checksum(self, pkt):
        """
        Check ICMPv6 checksum in IPv4 packet

        :param pkt: Packet to check ICMPv6 checksum
        """
        new = pkt.__class__(str(pkt))
        if pkt.haslayer(ICMPv6DestUnreach):
            del new['ICMPv6DestUnreach'].cksum
            new = new.__class__(str(new))
            self.assertEqual(new['ICMPv6DestUnreach'].cksum,
                             pkt['ICMPv6DestUnreach'].cksum)
            self.check_icmp_errror_embedded(pkt)
        if pkt.haslayer(ICMPv6EchoRequest):
            del new['ICMPv6EchoRequest'].cksum
            new = new.__class__(str(new))
            self.assertEqual(new['ICMPv6EchoRequest'].cksum,
                             pkt['ICMPv6EchoRequest'].cksum)
        if pkt.haslayer(ICMPv6EchoReply):
            del new['ICMPv6EchoReply'].cksum
            new = new.__class__(str(new))
            self.assertEqual(new['ICMPv6EchoReply'].cksum,
                             pkt['ICMPv6EchoReply'].cksum)

    def create_stream_in(self, in_if, out_if, ttl=64):
        """
        Create packet stream for inside network

        :param in_if: Inside interface
        :param out_if: Outside interface
        :param ttl: TTL of generated packets
        """
        pkts = []
        # TCP
        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
             TCP(sport=self.tcp_port_in, dport=20))
        pkts.append(p)

        # UDP
        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
             UDP(sport=self.udp_port_in, dport=20))
        pkts.append(p)

        # ICMP
        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
             ICMP(id=self.icmp_id_in, type='echo-request'))
        pkts.append(p)

        return pkts

    def compose_ip6(self, ip4, pref, plen):
        """
        Compose IPv4-embedded IPv6 addresses

        :param ip4: IPv4 address
        :param pref: IPv6 prefix
        :param plen: IPv6 prefix length
        :returns: IPv4-embedded IPv6 addresses
        """
        pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
        ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
        if plen == 32:
            pref_n[4] = ip4_n[0]
            pref_n[5] = ip4_n[1]
            pref_n[6] = ip4_n[2]
            pref_n[7] = ip4_n[3]
        elif plen == 40:
            pref_n[5] = ip4_n[0]
            pref_n[6] = ip4_n[1]
            pref_n[7] = ip4_n[2]
            pref_n[9] = ip4_n[3]
        elif plen == 48:
            pref_n[6] = ip4_n[0]
            pref_n[7] = ip4_n[1]
            pref_n[9] = ip4_n[2]
            pref_n[10] = ip4_n[3]
        elif plen == 56:
            pref_n[7] = ip4_n[0]
            pref_n[9] = ip4_n[1]
            pref_n[10] = ip4_n[2]
            pref_n[11] = ip4_n[3]
        elif plen == 64:
            pref_n[9] = ip4_n[0]
            pref_n[10] = ip4_n[1]
            pref_n[11] = ip4_n[2]
            pref_n[12] = ip4_n[3]
        elif plen == 96:
            pref_n[12] = ip4_n[0]
            pref_n[13] = ip4_n[1]
            pref_n[14] = ip4_n[2]
            pref_n[15] = ip4_n[3]
        return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))

    def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
        """
        Create IPv6 packet stream for inside network

        :param in_if: Inside interface
        :param out_if: Outside interface
        :param ttl: Hop Limit of generated packets
        :param pref: NAT64 prefix
        :param plen: NAT64 prefix length
        """
        pkts = []
        if pref is None:
            dst = ''.join(['64:ff9b::', out_if.remote_ip4])
        else:
            dst = self.compose_ip6(out_if.remote_ip4, pref, plen)

        # TCP
        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
             TCP(sport=self.tcp_port_in, dport=20))
        pkts.append(p)

        # UDP
        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
             UDP(sport=self.udp_port_in, dport=20))
        pkts.append(p)

        # ICMP
        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
             IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
             ICMPv6EchoRequest(id=self.icmp_id_in))
        pkts.append(p)

        return pkts

    def create_stream_out(self, out_if, dst_ip=None, ttl=64):
        """
        Create packet stream for outside network

        :param out_if: Outside interface
        :param dst_ip: Destination IP address (Default use global SNAT address)
        :param ttl: TTL of generated packets
        """
        if dst_ip is None:
            dst_ip = self.snat_addr
        pkts = []
        # TCP
        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
             TCP(dport=self.tcp_port_out, sport=20))
        pkts.append(p)

        # UDP
        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
             UDP(dport=self.udp_port_out, sport=20))
        pkts.append(p)

        # ICMP
        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
             ICMP(id=self.icmp_id_out, type='echo-reply'))
        pkts.append(p)

        return pkts

    def verify_capture_out(self, capture, nat_ip=None, same_port=False,
                           packet_num=3, dst_ip=None):
        """
        Verify captured packets on outside network

        :param capture: Captured packets
        :param nat_ip: Translated IP address (Default use global SNAT address)
        :param same_port: Sorce port number is not translated (Default False)
        :param packet_num: Expected number of packets (Default 3)
        :param dst_ip: Destination IP address (Default do not verify)
        """
        if nat_ip is None:
            nat_ip = self.snat_addr
        self.assertEqual(packet_num, len(capture))
        for packet in capture:
            try:
                self.check_ip_checksum(packet)
                self.assertEqual(packet[IP].src, nat_ip)
                if dst_ip is not None:
                    self.assertEqual(packet[IP].dst, dst_ip)
                if packet.haslayer(TCP):
                    if same_port:
                        self.assertEqual(packet[TCP].sport, self.tcp_port_in)
                    else:
                        self.assertNotEqual(
                            packet[TCP].sport, self.tcp_port_in)
                    self.tcp_port_out = packet[TCP].sport
                    self.check_tcp_checksum(packet)
                elif packet.haslayer(UDP):
                    if same_port:
                        self.assertEqual(packet[UDP].sport, self.udp_port_in)
                    else:
                        self.assertNotEqual(
                            packet[UDP].sport, self.udp_port_in)
                    self.udp_port_out = packet[UDP].sport
                else:
                    if same_port:
                        self.assertEqual(packet[ICMP].id, self.icmp_id_in)
                    else:
                        self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
                    self.icmp_id_out = packet[ICMP].id
                    self.check_icmp_checksum(packet)
            except:
                self.logger.error(ppp("Unexpected or invalid packet "
                                      "(outside network):", packet))
                raise

    def verify_capture_in(self, capture, in_if, packet_num=3):
        """
        Verify captured packets on inside network

        :param capture: Captured packets
        :param in_if: Inside interface
        :param packet_num: Expected number of packets (Default 3)
        """
        self.assertEqual(packet_num, len(capture))
        for packet in capture:
            try:
                self.check_ip_checksum(packet)
                self.assertEqual(packet[IP].dst, in_if.remote_ip4)
                if packet.haslayer(TCP):
                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
                    self.check_tcp_checksum(packet)
                elif packet.haslayer(UDP):
                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
                else:
                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
                    self.check_icmp_checksum(packet)
            except:
                self.logger.error(ppp("Unexpected or invalid packet "
                                      "(inside network):", packet))
                raise

    def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
        """
        Verify captured IPv6 packets on inside network

        :param capture: Captured packets
        :param src_ip: Source IP
        :param dst_ip: Destination IP address
        :param packet_num: Expected number of packets (Default 3)
        """
        self.assertEqual(packet_num, len(capture))
        for packet in capture:
            try:
                self.assertEqual(packet[IPv6].src, src_ip)
                self.assertEqual(packet[IPv6].dst, dst_ip)
                if packet.haslayer(TCP):
                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
                    self.check_tcp_checksum(packet)
                elif packet.haslayer(UDP):
                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
                    self.check_udp_checksum(packet)
                else:
                    self.assertEqual(packet[ICMPv6EchoReply].id,
                                     self.icmp_id_in)
                    self.check_icmpv6_checksum(packet)
            except:
                self.logger.error(ppp("Unexpected or invalid packet "
                                      "(inside network):", packet))
                raise

    def verify_capture_no_translation(self, capture, ingress_if, egress_if):
        """
        Verify captured packet that don't have to be translated

        :param capture: Captured packets
        :param ingress_if: Ingress interface
        :param egress_if: Egress interface
        """
        for packet in capture:
            try:
                self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
                self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
                if packet.haslayer(TCP):
                    self.assertEqual(packet[TCP].sport, self.tcp_port_in)
                elif packet.haslayer(UDP):
                    self.assertEqual(packet[UDP].sport, self.udp_port_in)
                else:
                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
            except:
                self.logger.error(ppp("Unexpected or invalid packet "
                                      "(inside network):", packet))
                raise

    def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
                                            packet_num=3, icmp_type=11):
        """
        Verify captured packets with ICMP errors on outside network

        :param capture: Captured packets
        :param src_ip: Translated IP address or IP address of VPP
                       (Default use global SNAT address)
        :param packet_num: Expected number of packets (Default 3)
        :param icmp_type: Type of error ICMP packet
                          we are expecting (Default 11)
        """
        if src_ip is None:
            src_ip = self.snat_addr
        self.assertEqual(packet_num, len(capture))
        for packet in capture:
            try:
                self.assertEqual(packet[IP].src, src_ip)
                self.assertTrue(packet.haslayer(ICMP))
                icmp = packet[ICMP]
                self.assertEqual(icmp.type, icmp_type)
                self.assertTrue(icmp.haslayer(IPerror))
                inner_ip = icmp[IPerror]
                if inner_ip.haslayer(TCPerror):
                    self.assertEqual(inner_ip[TCPerror].dport,
                                     self.tcp_port_out)
                elif inner_ip.haslayer(UDPerror):
                    self.assertEqual(inner_ip[UDPerror].dport,
                                     self.udp_port_out)
                else:
                    self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
            except:
                self.logger.error(ppp("Unexpected or invalid packet "
                                      "(outside network):", packet))
                raise

    def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
                                           icmp_type=11):
        """
        Verify captured packets with ICMP errors on inside network

        :param capture: Captured packets
        :param in_if: Inside interface
        :param packet_num: Expected number of packets (Default 3)
        :param icmp_type: Type of error ICMP packet
                          we are expecting (Default 11)
        """
        self.assertEqual(packet_num, len(capture))
        for packet in capture:
            try:
                self.assertEqual(packet[IP].dst, in_if.remote_ip4)
                self.assertTrue(packet.haslayer(ICMP))
                icmp = packet[ICMP]
                self.assertEqual(icmp.type, icmp_type)
                self.assertTrue(icmp.haslayer(IPerror))
                inner_ip = icmp[IPerror]
                if inner_ip.haslayer(TCPerror):
                    self.assertEqual(inner_ip[TCPerror].sport,
                                     self.tcp_port_in)
                elif inner_ip.haslayer(UDPerror):
                    self.assertEqual(inner_ip[UDPerror].sport,
                                     self.udp_port_in)
                else:
                    self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
            except:
                self.logger.error(ppp("Unexpected or invalid packet "
                                      "(inside network):", packet))
                raise

    def verify_ipfix_nat44_ses(self, data):
        """
        Verify IPFIX NAT44 session create/delete event

        :param data: Decoded IPFIX data records
        """
        nat44_ses_create_num = 0
        nat44_ses_delete_num = 0
        self.assertEqual(6, len(data))
        for record in data:
            # natEvent
            self.assertIn(ord(record[230]), [4, 5])
            if ord(record[230]) == 4:
                nat44_ses_create_num += 1
            else:
                nat44_ses_delete_num += 1
            # sourceIPv4Address
            self.assertEqual(self.pg0.remote_ip4n, record[8])
            # postNATSourceIPv4Address
            self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
                             record[225])
            # ingressVRFID
            self.assertEqual(struct.pack("!I", 0), record[234])
            # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
            if IP_PROTOS.icmp == ord(record[4]):
                self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
                self.assertEqual(struct.pack("!H", self.icmp_id_out),
                                 record[227])
            elif IP_PROTOS.tcp == ord(record[4]):
                self.assertEqual(struct.pack("!H", self.tcp_port_in),
                                 record[7])
                self.assertEqual(struct.pack("!H", self.tcp_port_out),
                                 record[227])
            elif IP_PROTOS.udp == ord(record[4]):
                self.assertEqual(struct.pack("!H", self.udp_port_in),
                                 record[7])
                self.assertEqual(struct.pack("!H", self.udp_port_out),
                                 record[227])
            else:
                self.fail("Invalid protocol")
        self.assertEqual(3, nat44_ses_create_num)
        self.assertEqual(3, nat44_ses_delete_num)

    def verify_ipfix_addr_exhausted(self, data):
        """
        Verify IPFIX NAT addresses event

        :param data: Decoded IPFIX data records
        """
        self.assertEqual(1, len(data))
        record = data[0]
        # natEvent
        self.assertEqual(ord(record[230]), 3)
        # natPoolID
        self.assertEqual(struct.pack("!I", 0), record[283])


class TestSNAT(MethodHolder):
    """ SNAT Test Cases """

    @classmethod
    def setUpClass(cls):
        super(TestSNAT, cls).setUpClass()

        try:
            cls.tcp_port_in = 6303
            cls.tcp_port_out = 6303
            cls.udp_port_in = 6304
            cls.udp_port_out = 6304
            cls.icmp_id_in = 6305
            cls.icmp_id_out = 6305
            cls.snat_addr = '10.0.0.3'
            cls.ipfix_src_port = 4739
            cls.ipfix_domain_id = 1

            cls.create_pg_interfaces(range(9))
            cls.interfaces = list(cls.pg_interfaces[0:4])

            for i in cls.interfaces:
                i.admin_up()
                i.config_ip4()
                i.resolve_arp()

            cls.pg0.generate_remote_hosts(3)
            cls.pg0.configure_ipv4_neighbors()

            cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))

            cls.pg4._local_ip4 = "172.16.255.1"
            cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
            cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
            cls.pg4.set_table_ip4(10)
            cls.pg5._local_ip4 = "172.16.255.3"
            cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
            cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
            cls.pg5.set_table_ip4(10)
            cls.pg6._local_ip4 = "172.16.255.1"
            cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
            cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
            cls.pg6.set_table_ip4(20)
            for i in cls.overlapping_interfaces:
                i.config_ip4()
                i.admin_up()
                i.resolve_arp()

            cls.pg7.admin_up()
            cls.pg8.admin_up()

        except Exception:
            super(TestSNAT, cls).tearDownClass()
            raise

    def clear_snat(self):
        """
        Clear SNAT configuration.
        """
        # I found no elegant way to do this
        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
                                   dst_address_length=32,
                                   next_hop_address=self.pg7.remote_ip4n,
                                   next_hop_sw_if_index=self.pg7.sw_if_index,
                                   is_add=0)
        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
                                   dst_address_length=32,
                                   next_hop_address=self.pg8.remote_ip4n,
                                   next_hop_sw_if_index=self.pg8.sw_if_index,
                                   is_add=0)

        for intf in [self.pg7, self.pg8]:
            neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
            for n in neighbors:
                self.vapi.ip_neighbor_add_del(intf.sw_if_index,
                                              n.mac_address,
                                              n.ip_address,
                                              is_add=0)

        if self.pg7.has_ip4_config:
            self.pg7.unconfig_ip4()

        interfaces = self.vapi.snat_interface_addr_dump()
        for intf in interfaces:
            self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)

        self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
                             domain_id=self.ipfix_domain_id)
        self.ipfix_src_port = 4739
        self.ipfix_domain_id = 1

        interfaces = self.vapi.snat_interface_dump()
        for intf in interfaces:
            self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
                                                     intf.is_inside,
                                                     is_add=0)

        static_mappings = self.vapi.snat_static_mapping_dump()
        for sm in static_mappings:
            self.vapi.snat_add_static_mapping(sm.local_ip_address,
                                              sm.external_ip_address,
                                              local_port=sm.local_port,
                                              external_port=sm.external_port,
                                              addr_only=sm.addr_only,
                                              vrf_id=sm.vrf_id,
                                              protocol=sm.protocol,
                                              is_add=0)

        adresses = self.vapi.snat_address_dump()
        for addr in adresses:
            self.vapi.snat_add_address_range(addr.ip_address,
                                             addr.ip_address,
                                             is_add=0)

    def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
                                local_port=0, external_port=0, vrf_id=0,
                                is_add=1, external_sw_if_index=0xFFFFFFFF,
                                proto=0):
        """
        Add/delete S-NAT static mapping

        :param local_ip: Local IP address
        :param external_ip: External IP address
        :param local_port: Local port number (Optional)
        :param external_port: External port number (Optional)
        :param vrf_id: VRF ID (Default 0)
        :param is_add: 1 if add, 0 if delete (Default add)
        :param external_sw_if_index: External interface instead of IP address
        :param proto: IP protocol (Mandatory if port specified)
        """
        addr_only = 1
        if local_port and external_port:
            addr_only = 0
        l_ip = socket.inet_pton(socket.AF_INET, local_ip)
        e_ip = socket.inet_pton(socket.AF_INET, external_ip)
        self.vapi.snat_add_static_mapping(
            l_ip,
            e_ip,
            external_sw_if_index,
            local_port,
            external_port,
            addr_only,
            vrf_id,
            proto,
            is_add)

    def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
        """
        Add/delete S-NAT address

        :param ip: IP address
        :param is_add: 1 if add, 0 if delete (Default add)
        """
        snat_addr = socket.inet_pton(socket.AF_INET, ip)
        self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
                                         vrf_id=vrf_id)

    def test_dynamic(self):
        """ SNAT dynamic translation test """

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # in2out
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture)

        # out2in
        pkts = self.create_stream_out(self.pg1)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg0)

    def test_dynamic_icmp_errors_in2out_ttl_1(self):
        """ SNAT handling of client packets with TTL=1 """

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # Client side - generate traffic
        pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        # Client side - verify ICMP type 11 packets
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in_with_icmp_errors(capture, self.pg0)

    def test_dynamic_icmp_errors_out2in_ttl_1(self):
        """ SNAT handling of server packets with TTL=1 """

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # Client side - create sessions
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        # Server side - generate traffic
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture)
        pkts = self.create_stream_out(self.pg1, ttl=1)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        # Server side - verify ICMP type 11 packets
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out_with_icmp_errors(capture,
                                                 src_ip=self.pg1.local_ip4)

    def test_dynamic_icmp_errors_in2out_ttl_2(self):
        """ SNAT handling of error responses to client packets with TTL=2 """

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # Client side - generate traffic
        pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        # Server side - simulate ICMP type 11 response
        capture = self.pg1.get_capture(len(pkts))
        pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
                IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
                ICMP(type=11) / packet[IP] for packet in capture]
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        # Client side - verify ICMP type 11 packets
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in_with_icmp_errors(capture, self.pg0)

    def test_dynamic_icmp_errors_out2in_ttl_2(self):
        """ SNAT handling of error responses to server packets with TTL=2 """

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # Client side - create sessions
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        # Server side - generate traffic
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture)
        pkts = self.create_stream_out(self.pg1, ttl=2)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        # Client side - simulate ICMP type 11 response
        capture = self.pg0.get_capture(len(pkts))
        pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
                ICMP(type=11) / packet[IP] for packet in capture]
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        # Server side - verify ICMP type 11 packets
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out_with_icmp_errors(capture)

    def test_ping_out_interface_from_outside(self):
        """ Ping SNAT out interface from outside network """

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
             ICMP(id=self.icmp_id_out, type='echo-request'))
        pkts = [p]
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.assertEqual(1, len(capture))
        packet = capture[0]
        try:
            self.assertEqual(packet[IP].src, self.pg1.local_ip4)
            self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
            self.assertEqual(packet[ICMP].id, self.icmp_id_in)
            self.assertEqual(packet[ICMP].type, 0)  # echo reply
        except:
            self.logger.error(ppp("Unexpected or invalid packet "
                                  "(outside network):", packet))
            raise

    def test_ping_internal_host_from_outside(self):
        """ Ping internal host from outside network """

        self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # out2in
        pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
               IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
               ICMP(id=self.icmp_id_out, type='echo-request'))
        self.pg1.add_stream(pkt)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)
        self.verify_capture_in(capture, self.pg0, packet_num=1)
        self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)

        # in2out
        pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
               IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
               ICMP(id=self.icmp_id_in, type='echo-reply'))
        self.pg0.add_stream(pkt)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        self.verify_capture_out(capture, same_port=True, packet_num=1)
        self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)

    def test_static_in(self):
        """ SNAT 1:1 NAT initialized from inside network """

        nat_ip = "10.0.0.10"
        self.tcp_port_out = 6303
        self.udp_port_out = 6304
        self.icmp_id_out = 6305

        self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # in2out
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip, True)

        # out2in
        pkts = self.create_stream_out(self.pg1, nat_ip)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg0)

    def test_static_out(self):
        """ SNAT 1:1 NAT initialized from outside network """

        nat_ip = "10.0.0.20"
        self.tcp_port_out = 6303
        self.udp_port_out = 6304
        self.icmp_id_out = 6305

        self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # out2in
        pkts = self.create_stream_out(self.pg1, nat_ip)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg0)

        # in2out
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip, True)

    def test_static_with_port_in(self):
        """ SNAT 1:1 NAT with port initialized from inside network """

        self.tcp_port_out = 3606
        self.udp_port_out = 3607
        self.icmp_id_out = 3608

        self.snat_add_address(self.snat_addr)
        self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
                                     self.tcp_port_in, self.tcp_port_out,
                                     proto=IP_PROTOS.tcp)
        self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
                                     self.udp_port_in, self.udp_port_out,
                                     proto=IP_PROTOS.udp)
        self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
                                     self.icmp_id_in, self.icmp_id_out,
                                     proto=IP_PROTOS.icmp)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # in2out
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture)

        # out2in
        pkts = self.create_stream_out(self.pg1)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg0)

    def test_static_with_port_out(self):
        """ SNAT 1:1 NAT with port initialized from outside network """

        self.tcp_port_out = 30606
        self.udp_port_out = 30607
        self.icmp_id_out = 30608

        self.snat_add_address(self.snat_addr)
        self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
                                     self.tcp_port_in, self.tcp_port_out,
                                     proto=IP_PROTOS.tcp)
        self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
                                     self.udp_port_in, self.udp_port_out,
                                     proto=IP_PROTOS.udp)
        self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
                                     self.icmp_id_in, self.icmp_id_out,
                                     proto=IP_PROTOS.icmp)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # out2in
        pkts = self.create_stream_out(self.pg1)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg0)

        # in2out
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture)

    def test_static_vrf_aware(self):
        """ SNAT 1:1 NAT VRF awareness """

        nat_ip1 = "10.0.0.30"
        nat_ip2 = "10.0.0.40"
        self.tcp_port_out = 6303
        self.udp_port_out = 6304
        self.icmp_id_out = 6305

        self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
                                     vrf_id=10)
        self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
                                     vrf_id=10)
        self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
                                                 is_inside=0)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)

        # inside interface VRF match SNAT static mapping VRF
        pkts = self.create_stream_in(self.pg4, self.pg3)
        self.pg4.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg3.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip1, True)

        # inside interface VRF don't match SNAT static mapping VRF (packets
        # are dropped)
        pkts = self.create_stream_in(self.pg0, self.pg3)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        self.pg3.assert_nothing_captured()

    def test_multiple_inside_interfaces(self):
        """ SNAT multiple inside interfaces (non-overlapping address space) """

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
                                                 is_inside=0)

        # between two S-NAT inside interfaces (no translation)
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_no_translation(capture, self.pg0, self.pg1)

        # from S-NAT inside to interface without S-NAT feature (no translation)
        pkts = self.create_stream_in(self.pg0, self.pg2)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg2.get_capture(len(pkts))
        self.verify_capture_no_translation(capture, self.pg0, self.pg2)

        # in2out 1st interface
        pkts = self.create_stream_in(self.pg0, self.pg3)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg3.get_capture(len(pkts))
        self.verify_capture_out(capture)

        # out2in 1st interface
        pkts = self.create_stream_out(self.pg3)
        self.pg3.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg0)

        # in2out 2nd interface
        pkts = self.create_stream_in(self.pg1, self.pg3)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg3.get_capture(len(pkts))
        self.verify_capture_out(capture)

        # out2in 2nd interface
        pkts = self.create_stream_out(self.pg3)
        self.pg3.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg1)

    def test_inside_overlapping_interfaces(self):
        """ SNAT multiple inside interfaces with overlapping address space """

        static_nat_ip = "10.0.0.10"
        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
                                                 is_inside=0)
        self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
        self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
                                     vrf_id=20)

        # between S-NAT inside interfaces with same VRF (no translation)
        pkts = self.create_stream_in(self.pg4, self.pg5)
        self.pg4.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg5.get_capture(len(pkts))
        self.verify_capture_no_translation(capture, self.pg4, self.pg5)

        # between S-NAT inside interfaces with different VRF (hairpinning)
        p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
             IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
             TCP(sport=1234, dport=5678))
        self.pg4.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg6.get_capture(1)
        p = capture[0]
        try:
            ip = p[IP]
            tcp = p[TCP]
            self.assertEqual(ip.src, self.snat_addr)
            self.assertEqual(ip.dst, self.pg6.remote_ip4)
            self.assertNotEqual(tcp.sport, 1234)
            self.assertEqual(tcp.dport, 5678)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", p))
            raise

        # in2out 1st interface
        pkts = self.create_stream_in(self.pg4, self.pg3)
        self.pg4.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg3.get_capture(len(pkts))
        self.verify_capture_out(capture)

        # out2in 1st interface
        pkts = self.create_stream_out(self.pg3)
        self.pg3.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg4.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg4)

        # in2out 2nd interface
        pkts = self.create_stream_in(self.pg5, self.pg3)
        self.pg5.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg3.get_capture(len(pkts))
        self.verify_capture_out(capture)

        # out2in 2nd interface
        pkts = self.create_stream_out(self.pg3)
        self.pg3.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg5.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg5)

        # pg5 session dump
        addresses = self.vapi.snat_address_dump()
        self.assertEqual(len(addresses), 1)
        sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
        self.assertEqual(len(sessions), 3)
        for session in sessions:
            self.assertFalse(session.is_static)
            self.assertEqual(session.inside_ip_address[0:4],
                             self.pg5.remote_ip4n)
            self.assertEqual(session.outside_ip_address,
                             addresses[0].ip_address)
        self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
        self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
        self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
        self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
        self.assertEqual(sessions[1].inside_port, self.udp_port_in)
        self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
        self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
        self.assertEqual(sessions[1].outside_port, self.udp_port_out)
        self.assertEqual(sessions[2].outside_port, self.icmp_id_out)

        # in2out 3rd interface
        pkts = self.create_stream_in(self.pg6, self.pg3)
        self.pg6.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg3.get_capture(len(pkts))
        self.verify_capture_out(capture, static_nat_ip, True)

        # out2in 3rd interface
        pkts = self.create_stream_out(self.pg3, static_nat_ip)
        self.pg3.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg6.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg6)

        # general user and session dump verifications
        users = self.vapi.snat_user_dump()
        self.assertTrue(len(users) >= 3)
        addresses = self.vapi.snat_address_dump()
        self.assertEqual(len(addresses), 1)
        for user in users:
            sessions = self.vapi.snat_user_session_dump(user.ip_address,
                                                        user.vrf_id)
            for session in sessions:
                self.assertEqual(user.ip_address, session.inside_ip_address)
                self.assertTrue(session.total_bytes > session.total_pkts > 0)
                self.assertTrue(session.protocol in
                                [IP_PROTOS.tcp, IP_PROTOS.udp,
                                 IP_PROTOS.icmp])

        # pg4 session dump
        sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
        self.assertTrue(len(sessions) >= 4)
        for session in sessions:
            self.assertFalse(session.is_static)
            self.assertEqual(session.inside_ip_address[0:4],
                             self.pg4.remote_ip4n)
            self.assertEqual(session.outside_ip_address,
                             addresses[0].ip_address)

        # pg6 session dump
        sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
        self.assertTrue(len(sessions) >= 3)
        for session in sessions:
            self.assertTrue(session.is_static)
            self.assertEqual(session.inside_ip_address[0:4],
                             self.pg6.remote_ip4n)
            self.assertEqual(map(ord, session.outside_ip_address[0:4]),
                             map(int, static_nat_ip.split('.')))
            self.assertTrue(session.inside_port in
                            [self.tcp_port_in, self.udp_port_in,
                             self.icmp_id_in])

    def test_hairpinning(self):
        """ SNAT hairpinning - 1:1 NAT with port"""

        host = self.pg0.remote_hosts[0]
        server = self.pg0.remote_hosts[1]
        host_in_port = 1234
        host_out_port = 0
        server_in_port = 5678
        server_out_port = 8765

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)
        # add static mapping for server
        self.snat_add_static_mapping(server.ip4, self.snat_addr,
                                     server_in_port, server_out_port,
                                     proto=IP_PROTOS.tcp)

        # send packet from host to server
        p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
             IP(src=host.ip4, dst=self.snat_addr) /
             TCP(sport=host_in_port, dport=server_out_port))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)
        p = capture[0]
        try:
            ip = p[IP]
            tcp = p[TCP]
            self.assertEqual(ip.src, self.snat_addr)
            self.assertEqual(ip.dst, server.ip4)
            self.assertNotEqual(tcp.sport, host_in_port)
            self.assertEqual(tcp.dport, server_in_port)
            self.check_tcp_checksum(p)
            host_out_port = tcp.sport
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", p))
            raise

        # send reply from server to host
        p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
             IP(src=server.ip4, dst=self.snat_addr) /
             TCP(sport=server_in_port, dport=host_out_port))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)
        p = capture[0]
        try:
            ip = p[IP]
            tcp = p[TCP]
            self.assertEqual(ip.src, self.snat_addr)
            self.assertEqual(ip.dst, host.ip4)
            self.assertEqual(tcp.sport, server_out_port)
            self.assertEqual(tcp.dport, host_in_port)
            self.check_tcp_checksum(p)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:"), p)
            raise

    def test_hairpinning2(self):
        """ SNAT hairpinning - 1:1 NAT"""

        server1_nat_ip = "10.0.0.10"
        server2_nat_ip = "10.0.0.11"
        host = self.pg0.remote_hosts[0]
        server1 = self.pg0.remote_hosts[1]
        server2 = self.pg0.remote_hosts[2]
        server_tcp_port = 22
        server_udp_port = 20

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # add static mapping for servers
        self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
        self.snat_add_static_mapping(server2.ip4, server2_nat_ip)

        # host to server1
        pkts = []
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=host.ip4, dst=server1_nat_ip) /
             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=host.ip4, dst=server1_nat_ip) /
             UDP(sport=self.udp_port_in, dport=server_udp_port))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=host.ip4, dst=server1_nat_ip) /
             ICMP(id=self.icmp_id_in, type='echo-request'))
        pkts.append(p)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        for packet in capture:
            try:
                self.assertEqual(packet[IP].src, self.snat_addr)
                self.assertEqual(packet[IP].dst, server1.ip4)
                if packet.haslayer(TCP):
                    self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
                    self.assertEqual(packet[TCP].dport, server_tcp_port)
                    self.tcp_port_out = packet[TCP].sport
                    self.check_tcp_checksum(packet)
                elif packet.haslayer(UDP):
                    self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
                    self.assertEqual(packet[UDP].dport, server_udp_port)
                    self.udp_port_out = packet[UDP].sport
                else:
                    self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
                    self.icmp_id_out = packet[ICMP].id
            except:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise

        # server1 to host
        pkts = []
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=server1.ip4, dst=self.snat_addr) /
             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=server1.ip4, dst=self.snat_addr) /
             UDP(sport=server_udp_port, dport=self.udp_port_out))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=server1.ip4, dst=self.snat_addr) /
             ICMP(id=self.icmp_id_out, type='echo-reply'))
        pkts.append(p)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        for packet in capture:
            try:
                self.assertEqual(packet[IP].src, server1_nat_ip)
                self.assertEqual(packet[IP].dst, host.ip4)
                if packet.haslayer(TCP):
                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
                    self.assertEqual(packet[TCP].sport, server_tcp_port)
                    self.check_tcp_checksum(packet)
                elif packet.haslayer(UDP):
                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
                    self.assertEqual(packet[UDP].sport, server_udp_port)
                else:
                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
            except:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise

        # server2 to server1
        pkts = []
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=server2.ip4, dst=server1_nat_ip) /
             TCP(sport=self.tcp_port_in, dport=server_tcp_port))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=server2.ip4, dst=server1_nat_ip) /
             UDP(sport=self.udp_port_in, dport=server_udp_port))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=server2.ip4, dst=server1_nat_ip) /
             ICMP(id=self.icmp_id_in, type='echo-request'))
        pkts.append(p)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        for packet in capture:
            try:
                self.assertEqual(packet[IP].src, server2_nat_ip)
                self.assertEqual(packet[IP].dst, server1.ip4)
                if packet.haslayer(TCP):
                    self.assertEqual(packet[TCP].sport, self.tcp_port_in)
                    self.assertEqual(packet[TCP].dport, server_tcp_port)
                    self.tcp_port_out = packet[TCP].sport
                    self.check_tcp_checksum(packet)
                elif packet.haslayer(UDP):
                    self.assertEqual(packet[UDP].sport, self.udp_port_in)
                    self.assertEqual(packet[UDP].dport, server_udp_port)
                    self.udp_port_out = packet[UDP].sport
                else:
                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
                    self.icmp_id_out = packet[ICMP].id
            except:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise

        # server1 to server2
        pkts = []
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=server1.ip4, dst=server2_nat_ip) /
             TCP(sport=server_tcp_port, dport=self.tcp_port_out))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=server1.ip4, dst=server2_nat_ip) /
             UDP(sport=server_udp_port, dport=self.udp_port_out))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=server1.ip4, dst=server2_nat_ip) /
             ICMP(id=self.icmp_id_out, type='echo-reply'))
        pkts.append(p)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        for packet in capture:
            try:
                self.assertEqual(packet[IP].src, server1_nat_ip)
                self.assertEqual(packet[IP].dst, server2.ip4)
                if packet.haslayer(TCP):
                    self.assertEqual(packet[TCP].dport, self.tcp_port_in)
                    self.assertEqual(packet[TCP].sport, server_tcp_port)
                    self.check_tcp_checksum(packet)
                elif packet.haslayer(UDP):
                    self.assertEqual(packet[UDP].dport, self.udp_port_in)
                    self.assertEqual(packet[UDP].sport, server_udp_port)
                else:
                    self.assertEqual(packet[ICMP].id, self.icmp_id_in)
            except:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise

    def test_max_translations_per_user(self):
        """ MAX translations per user - recycle the least recently used """

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # get maximum number of translations per user
        snat_config = self.vapi.snat_show_config()

        # send more than maximum number of translations per user packets
        pkts_num = snat_config.max_translations_per_user + 5
        pkts = []
        for port in range(0, pkts_num):
            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
                 TCP(sport=1025 + port))
            pkts.append(p)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        # verify number of translated packet
        self.pg1.get_capture(pkts_num)

    def test_interface_addr(self):
        """ Acquire SNAT addresses from interface """
        self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)

        # no address in NAT pool
        adresses = self.vapi.snat_address_dump()
        self.assertEqual(0, len(adresses))

        # configure interface address and check NAT address pool
        self.pg7.config_ip4()
        adresses = self.vapi.snat_address_dump()
        self.assertEqual(1, len(adresses))
        self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)

        # remove interface address and check NAT address pool
        self.pg7.unconfig_ip4()
        adresses = self.vapi.snat_address_dump()
        self.assertEqual(0, len(adresses))

    def test_interface_addr_static_mapping(self):
        """ Static mapping with addresses from interface """
        self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
        self.snat_add_static_mapping('1.2.3.4',
                                     external_sw_if_index=self.pg7.sw_if_index)

        # static mappings with external interface
        static_mappings = self.vapi.snat_static_mapping_dump()
        self.assertEqual(1, len(static_mappings))
        self.assertEqual(self.pg7.sw_if_index,
                         static_mappings[0].external_sw_if_index)

        # configure interface address and check static mappings
        self.pg7.config_ip4()
        static_mappings = self.vapi.snat_static_mapping_dump()
        self.assertEqual(1, len(static_mappings))
        self.assertEqual(static_mappings[0].external_ip_address[0:4],
                         self.pg7.local_ip4n)
        self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)

        # remove interface address and check static mappings
        self.pg7.unconfig_ip4()
        static_mappings = self.vapi.snat_static_mapping_dump()
        self.assertEqual(0, len(static_mappings))

    def test_ipfix_nat44_sess(self):
        """ S-NAT IPFIX logging NAT44 session created/delted """
        self.ipfix_domain_id = 10
        self.ipfix_src_port = 20202
        colector_port = 30303
        bind_layers(UDP, IPFIX, dport=30303)
        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)
        self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
                                     src_address=self.pg3.local_ip4n,
                                     path_mtu=512,
                                     template_interval=10,
                                     collector_port=colector_port)
        self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
                             src_port=self.ipfix_src_port)

        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture)
        self.snat_add_address(self.snat_addr, is_add=0)
        self.vapi.cli("ipfix flush")  # FIXME this should be an API call
        capture = self.pg3.get_capture(3)
        ipfix = IPFIXDecoder()
        # first load template
        for p in capture:
            self.assertTrue(p.haslayer(IPFIX))
            self.assertEqual(p[IP].src, self.pg3.local_ip4)
            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
            self.assertEqual(p[UDP].dport, colector_port)
            self.assertEqual(p[IPFIX].observationDomainID,
                             self.ipfix_domain_id)
            if p.haslayer(Template):
                ipfix.add_template(p.getlayer(Template))
        # verify events in data set
        for p in capture:
            if p.haslayer(Data):
                data = ipfix.decode_data_set(p.getlayer(Set))
                self.verify_ipfix_nat44_ses(data)

    def test_ipfix_addr_exhausted(self):
        """ S-NAT IPFIX logging NAT addresses exhausted """
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)
        self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
                                     src_address=self.pg3.local_ip4n,
                                     path_mtu=512,
                                     template_interval=10)
        self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
                             src_port=self.ipfix_src_port)

        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
             TCP(sport=3025))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(0)
        self.vapi.cli("ipfix flush")  # FIXME this should be an API call
        capture = self.pg3.get_capture(3)
        ipfix = IPFIXDecoder()
        # first load template
        for p in capture:
            self.assertTrue(p.haslayer(IPFIX))
            self.assertEqual(p[IP].src, self.pg3.local_ip4)
            self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
            self.assertEqual(p[UDP].sport, self.ipfix_src_port)
            self.assertEqual(p[UDP].dport, 4739)
            self.assertEqual(p[IPFIX].observationDomainID,
                             self.ipfix_domain_id)
            if p.haslayer(Template):
                ipfix.add_template(p.getlayer(Template))
        # verify events in data set
        for p in capture:
            if p.haslayer(Data):
                data = ipfix.decode_data_set(p.getlayer(Set))
                self.verify_ipfix_addr_exhausted(data)

    def test_pool_addr_fib(self):
        """ S-NAT add pool addresses to FIB """
        static_addr = '10.0.0.10'
        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)
        self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)

        # SNAT address
        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
             ARP(op=ARP.who_has, pdst=self.snat_addr,
                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        self.assertTrue(capture[0].haslayer(ARP))
        self.assertTrue(capture[0][ARP].op, ARP.is_at)

        # 1:1 NAT address
        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
             ARP(op=ARP.who_has, pdst=static_addr,
                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        self.assertTrue(capture[0].haslayer(ARP))
        self.assertTrue(capture[0][ARP].op, ARP.is_at)

        # send ARP to non-SNAT interface
        p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
             ARP(op=ARP.who_has, pdst=self.snat_addr,
                 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
        self.pg2.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(0)

        # remove addresses and verify
        self.snat_add_address(self.snat_addr, is_add=0)
        self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
                                     is_add=0)

        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
             ARP(op=ARP.who_has, pdst=self.snat_addr,
                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(0)

        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
             ARP(op=ARP.who_has, pdst=static_addr,
                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(0)

    def test_vrf_mode(self):
        """ S-NAT tenant VRF aware address pool mode """

        vrf_id1 = 1
        vrf_id2 = 2
        nat_ip1 = "10.0.0.10"
        nat_ip2 = "10.0.0.11"

        self.pg0.unconfig_ip4()
        self.pg1.unconfig_ip4()
        self.pg0.set_table_ip4(vrf_id1)
        self.pg1.set_table_ip4(vrf_id2)
        self.pg0.config_ip4()
        self.pg1.config_ip4()

        self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
        self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
                                                 is_inside=0)

        # first VRF
        pkts = self.create_stream_in(self.pg0, self.pg2)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg2.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip1)

        # second VRF
        pkts = self.create_stream_in(self.pg1, self.pg2)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg2.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip2)

    def test_vrf_feature_independent(self):
        """ S-NAT tenant VRF independent address pool mode """

        nat_ip1 = "10.0.0.10"
        nat_ip2 = "10.0.0.11"

        self.snat_add_address(nat_ip1)
        self.snat_add_address(nat_ip2)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
                                                 is_inside=0)

        # first VRF
        pkts = self.create_stream_in(self.pg0, self.pg2)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg2.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip1)

        # second VRF
        pkts = self.create_stream_in(self.pg1, self.pg2)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg2.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip1)

    def test_dynamic_ipless_interfaces(self):
        """ SNAT interfaces without configured ip dynamic map """

        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
                                      self.pg7.remote_mac,
                                      self.pg7.remote_ip4n,
                                      is_static=1)
        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
                                      self.pg8.remote_mac,
                                      self.pg8.remote_ip4n,
                                      is_static=1)

        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
                                   dst_address_length=32,
                                   next_hop_address=self.pg7.remote_ip4n,
                                   next_hop_sw_if_index=self.pg7.sw_if_index)
        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
                                   dst_address_length=32,
                                   next_hop_address=self.pg8.remote_ip4n,
                                   next_hop_sw_if_index=self.pg8.sw_if_index)

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
                                                 is_inside=0)

        # in2out
        pkts = self.create_stream_in(self.pg7, self.pg8)
        self.pg7.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg8.get_capture(len(pkts))
        self.verify_capture_out(capture)

        # out2in
        pkts = self.create_stream_out(self.pg8, self.snat_addr)
        self.pg8.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg7.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg7)

    def test_static_ipless_interfaces(self):
        """ SNAT 1:1 NAT interfaces without configured ip """

        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
                                      self.pg7.remote_mac,
                                      self.pg7.remote_ip4n,
                                      is_static=1)
        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
                                      self.pg8.remote_mac,
                                      self.pg8.remote_ip4n,
                                      is_static=1)

        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
                                   dst_address_length=32,
                                   next_hop_address=self.pg7.remote_ip4n,
                                   next_hop_sw_if_index=self.pg7.sw_if_index)
        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
                                   dst_address_length=32,
                                   next_hop_address=self.pg8.remote_ip4n,
                                   next_hop_sw_if_index=self.pg8.sw_if_index)

        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
                                                 is_inside=0)

        # out2in
        pkts = self.create_stream_out(self.pg8)
        self.pg8.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg7.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg7)

        # in2out
        pkts = self.create_stream_in(self.pg7, self.pg8)
        self.pg7.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg8.get_capture(len(pkts))
        self.verify_capture_out(capture, self.snat_addr, True)

    def test_static_with_port_ipless_interfaces(self):
        """ SNAT 1:1 NAT with port interfaces without configured ip """

        self.tcp_port_out = 30606
        self.udp_port_out = 30607
        self.icmp_id_out = 30608

        self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
                                      self.pg7.remote_mac,
                                      self.pg7.remote_ip4n,
                                      is_static=1)
        self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
                                      self.pg8.remote_mac,
                                      self.pg8.remote_ip4n,
                                      is_static=1)

        self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
                                   dst_address_length=32,
                                   next_hop_address=self.pg7.remote_ip4n,
                                   next_hop_sw_if_index=self.pg7.sw_if_index)
        self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
                                   dst_address_length=32,
                                   next_hop_address=self.pg8.remote_ip4n,
                                   next_hop_sw_if_index=self.pg8.sw_if_index)

        self.snat_add_address(self.snat_addr)
        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
                                     self.tcp_port_in, self.tcp_port_out,
                                     proto=IP_PROTOS.tcp)
        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
                                     self.udp_port_in, self.udp_port_out,
                                     proto=IP_PROTOS.udp)
        self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
                                     self.icmp_id_in, self.icmp_id_out,
                                     proto=IP_PROTOS.icmp)
        self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
                                                 is_inside=0)

        # out2in
        pkts = self.create_stream_out(self.pg8)
        self.pg8.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg7.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg7)

        # in2out
        pkts = self.create_stream_in(self.pg7, self.pg8)
        self.pg7.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg8.get_capture(len(pkts))
        self.verify_capture_out(capture)

    def test_static_unknown_proto(self):
        """ 1:1 NAT translate packet with unknown protocol """
        nat_ip = "10.0.0.10"
        self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # in2out
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
             GRE() /
             IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg1.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IP].src, nat_ip)
            self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
            self.assertTrue(packet.haslayer(GRE))
            self.check_ip_checksum(packet)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

        # out2in
        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IP(src=self.pg1.remote_ip4, dst=nat_ip) /
             GRE() /
             IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
            self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
            self.assertTrue(packet.haslayer(GRE))
            self.check_ip_checksum(packet)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

    def test_hairpinning_static_unknown_proto(self):
        """ 1:1 NAT translate packet with unknown protocol - hairpinning """

        host = self.pg0.remote_hosts[0]
        server = self.pg0.remote_hosts[1]

        host_nat_ip = "10.0.0.10"
        server_nat_ip = "10.0.0.11"

        self.snat_add_static_mapping(host.ip4, host_nat_ip)
        self.snat_add_static_mapping(server.ip4, server_nat_ip)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # host to server
        p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
             IP(src=host.ip4, dst=server_nat_ip) /
             GRE() /
             IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IP].src, host_nat_ip)
            self.assertEqual(packet[IP].dst, server.ip4)
            self.assertTrue(packet.haslayer(GRE))
            self.check_ip_checksum(packet)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

        # server to host
        p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
             IP(src=server.ip4, dst=host_nat_ip) /
             GRE() /
             IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IP].src, server_nat_ip)
            self.assertEqual(packet[IP].dst, host.ip4)
            self.assertTrue(packet.haslayer(GRE))
            self.check_ip_checksum(packet)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

    def test_unknown_proto(self):
        """ SNAT translate packet with unknown protocol """
        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # in2out
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
             TCP(sport=self.tcp_port_in, dport=20))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg1.get_capture(1)

        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
             GRE() /
             IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg1.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IP].src, self.snat_addr)
            self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
            self.assertTrue(packet.haslayer(GRE))
            self.check_ip_checksum(packet)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

        # out2in
        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
             GRE() /
             IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
            self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
            self.assertTrue(packet.haslayer(GRE))
            self.check_ip_checksum(packet)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

    def test_hairpinning_unknown_proto(self):
        """ SNAT translate packet with unknown protocol - hairpinning """
        host = self.pg0.remote_hosts[0]
        server = self.pg0.remote_hosts[1]
        host_in_port = 1234
        host_out_port = 0
        server_in_port = 5678
        server_out_port = 8765
        server_nat_ip = "10.0.0.11"

        self.snat_add_address(self.snat_addr)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # add static mapping for server
        self.snat_add_static_mapping(server.ip4, server_nat_ip)

        # host to server
        p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
             IP(src=host.ip4, dst=server_nat_ip) /
             TCP(sport=host_in_port, dport=server_out_port))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)

        p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
             IP(src=host.ip4, dst=server_nat_ip) /
             GRE() /
             IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IP].src, self.snat_addr)
            self.assertEqual(packet[IP].dst, server.ip4)
            self.assertTrue(packet.haslayer(GRE))
            self.check_ip_checksum(packet)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

        # server to host
        p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
             IP(src=server.ip4, dst=self.snat_addr) /
             GRE() /
             IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IP].src, server_nat_ip)
            self.assertEqual(packet[IP].dst, host.ip4)
            self.assertTrue(packet.haslayer(GRE))
            self.check_ip_checksum(packet)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

    def tearDown(self):
        super(TestSNAT, self).tearDown()
        if not self.vpp_dead:
            self.logger.info(self.vapi.cli("show snat verbose"))
            self.clear_snat()


class TestDeterministicNAT(MethodHolder):
    """ Deterministic NAT Test Cases """

    @classmethod
    def setUpConstants(cls):
        super(TestDeterministicNAT, cls).setUpConstants()
        cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])

    @classmethod
    def setUpClass(cls):
        super(TestDeterministicNAT, cls).setUpClass()

        try:
            cls.tcp_port_in = 6303
            cls.tcp_external_port = 6303
            cls.udp_port_in = 6304
            cls.udp_external_port = 6304
            cls.icmp_id_in = 6305
            cls.snat_addr = '10.0.0.3'

            cls.create_pg_interfaces(range(3))
            cls.interfaces = list(cls.pg_interfaces)

            for i in cls.interfaces:
                i.admin_up()
                i.config_ip4()
                i.resolve_arp()

            cls.pg0.generate_remote_hosts(2)
            cls.pg0.configure_ipv4_neighbors()

        except Exception:
            super(TestDeterministicNAT, cls).tearDownClass()
            raise

    def create_stream_in(self, in_if, out_if, ttl=64):
        """
        Create packet stream for inside network

        :param in_if: Inside interface
        :param out_if: Outside interface
        :param ttl: TTL of generated packets
        """
        pkts = []
        # TCP
        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
             TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
        pkts.append(p)

        # UDP
        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
             UDP(sport=self.udp_port_in, dport=self.udp_external_port))
        pkts.append(p)

        # ICMP
        p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
             ICMP(id=self.icmp_id_in, type='echo-request'))
        pkts.append(p)

        return pkts

    def create_stream_out(self, out_if, dst_ip=None, ttl=64):
        """
        Create packet stream for outside network

        :param out_if: Outside interface
        :param dst_ip: Destination IP address (Default use global SNAT address)
        :param ttl: TTL of generated packets
        """
        if dst_ip is None:
            dst_ip = self.snat_addr
        pkts = []
        # TCP
        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
             TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
        pkts.append(p)

        # UDP
        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
             UDP(dport=self.udp_port_out, sport=self.udp_external_port))
        pkts.append(p)

        # ICMP
        p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
             IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
             ICMP(id=self.icmp_external_id, type='echo-reply'))
        pkts.append(p)

        return pkts

    def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
        """
        Verify captured packets on outside network

        :param capture: Captured packets
        :param nat_ip: Translated IP address (Default use global SNAT address)
        :param same_port: Sorce port number is not translated (Default False)
        :param packet_num: Expected number of packets (Default 3)
        """
        if nat_ip is None:
            nat_ip = self.snat_addr
        self.assertEqual(packet_num, len(capture))
        for packet in capture:
            try:
                self.assertEqual(packet[IP].src, nat_ip)
                if packet.haslayer(TCP):
                    self.tcp_port_out = packet[TCP].sport
                elif packet.haslayer(UDP):
                    self.udp_port_out = packet[UDP].sport
                else:
                    self.icmp_external_id = packet[ICMP].id
            except:
                self.logger.error(ppp("Unexpected or invalid packet "
                                      "(outside network):", packet))
                raise

    def initiate_tcp_session(self, in_if, out_if):
        """
        Initiates TCP session

        :param in_if: Inside interface
        :param out_if: Outside interface
        """
        try:
            # SYN packet in->out
            p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
                 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                     flags="S"))
            in_if.add_stream(p)
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            capture = out_if.get_capture(1)
            p = capture[0]
            self.tcp_port_out = p[TCP].sport

            # SYN + ACK packet out->in
            p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
                 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                     flags="SA"))
            out_if.add_stream(p)
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            in_if.get_capture(1)

            # ACK packet in->out
            p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
                 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                     flags="A"))
            in_if.add_stream(p)
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            out_if.get_capture(1)

        except:
            self.logger.error("TCP 3 way handshake failed")
            raise

    def verify_ipfix_max_entries_per_user(self, data):
        """
        Verify IPFIX maximum entries per user exceeded event

        :param data: Decoded IPFIX data records
        """
        self.assertEqual(1, len(data))
        record = data[0]
        # natEvent
        self.assertEqual(ord(record[230]), 13)
        # natQuotaExceededEvent
        self.assertEqual('\x03\x00\x00\x00', record[466])
        # sourceIPv4Address
        self.assertEqual(self.pg0.remote_ip4n, record[8])

    def test_deterministic_mode(self):
        """ S-NAT run deterministic mode """
        in_addr = '172.16.255.0'
        out_addr = '172.17.255.50'
        in_addr_t = '172.16.255.20'
        in_addr_n = socket.inet_aton(in_addr)
        out_addr_n = socket.inet_aton(out_addr)
        in_addr_t_n = socket.inet_aton(in_addr_t)
        in_plen = 24
        out_plen = 32

        snat_config = self.vapi.snat_show_config()
        self.assertEqual(1, snat_config.deterministic)

        self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)

        rep1 = self.vapi.snat_det_forward(in_addr_t_n)
        self.assertEqual(rep1.out_addr[:4], out_addr_n)
        rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
        self.assertEqual(rep2.in_addr[:4], in_addr_t_n)

        deterministic_mappings = self.vapi.snat_det_map_dump()
        self.assertEqual(len(deterministic_mappings), 1)
        dsm = deterministic_mappings[0]
        self.assertEqual(in_addr_n, dsm.in_addr[:4])
        self.assertEqual(in_plen, dsm.in_plen)
        self.assertEqual(out_addr_n, dsm.out_addr[:4])
        self.assertEqual(out_plen, dsm.out_plen)

        self.clear_snat()
        deterministic_mappings = self.vapi.snat_det_map_dump()
        self.assertEqual(len(deterministic_mappings), 0)

    def test_set_timeouts(self):
        """ Set deterministic NAT timeouts """
        timeouts_before = self.vapi.snat_det_get_timeouts()

        self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
                                        timeouts_before.tcp_established + 10,
                                        timeouts_before.tcp_transitory + 10,
                                        timeouts_before.icmp + 10)

        timeouts_after = self.vapi.snat_det_get_timeouts()

        self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
        self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
        self.assertNotEqual(timeouts_before.tcp_established,
                            timeouts_after.tcp_established)
        self.assertNotEqual(timeouts_before.tcp_transitory,
                            timeouts_after.tcp_transitory)

    def test_det_in(self):
        """ CGNAT translation test (TCP, UDP, ICMP) """

        nat_ip = "10.0.0.10"

        self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
                                   32,
                                   socket.inet_aton(nat_ip),
                                   32)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # in2out
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip)

        # out2in
        pkts = self.create_stream_out(self.pg1, nat_ip)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in(capture, self.pg0)

        # session dump test
        sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
        self.assertEqual(len(sessions), 3)

        # TCP session
        s = sessions[0]
        self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
        self.assertEqual(s.in_port, self.tcp_port_in)
        self.assertEqual(s.out_port, self.tcp_port_out)
        self.assertEqual(s.ext_port, self.tcp_external_port)

        # UDP session
        s = sessions[1]
        self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
        self.assertEqual(s.in_port, self.udp_port_in)
        self.assertEqual(s.out_port, self.udp_port_out)
        self.assertEqual(s.ext_port, self.udp_external_port)

        # ICMP session
        s = sessions[2]
        self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
        self.assertEqual(s.in_port, self.icmp_id_in)
        self.assertEqual(s.out_port, self.icmp_external_id)

    def test_multiple_users(self):
        """ CGNAT multiple users """

        nat_ip = "10.0.0.10"
        port_in = 80
        external_port = 6303

        host0 = self.pg0.remote_hosts[0]
        host1 = self.pg0.remote_hosts[1]

        self.vapi.snat_add_det_map(host0.ip4n,
                                   24,
                                   socket.inet_aton(nat_ip),
                                   32)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        # host0 to out
        p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
             IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
             TCP(sport=port_in, dport=external_port))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        p = capture[0]
        try:
            ip = p[IP]
            tcp = p[TCP]
            self.assertEqual(ip.src, nat_ip)
            self.assertEqual(ip.dst, self.pg1.remote_ip4)
            self.assertEqual(tcp.dport, external_port)
            port_out0 = tcp.sport
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", p))
            raise

        # host1 to out
        p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
             IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
             TCP(sport=port_in, dport=external_port))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(1)
        p = capture[0]
        try:
            ip = p[IP]
            tcp = p[TCP]
            self.assertEqual(ip.src, nat_ip)
            self.assertEqual(ip.dst, self.pg1.remote_ip4)
            self.assertEqual(tcp.dport, external_port)
            port_out1 = tcp.sport
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", p))
            raise

        dms = self.vapi.snat_det_map_dump()
        self.assertEqual(1, len(dms))
        self.assertEqual(2, dms[0].ses_num)

        # out to host0
        p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
             IP(src=self.pg1.remote_ip4, dst=nat_ip) /
             TCP(sport=external_port, dport=port_out0))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)
        p = capture[0]
        try:
            ip = p[IP]
            tcp = p[TCP]
            self.assertEqual(ip.src, self.pg1.remote_ip4)
            self.assertEqual(ip.dst, host0.ip4)
            self.assertEqual(tcp.dport, port_in)
            self.assertEqual(tcp.sport, external_port)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", p))
            raise

        # out to host1
        p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
             IP(src=self.pg1.remote_ip4, dst=nat_ip) /
             TCP(sport=external_port, dport=port_out1))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(1)
        p = capture[0]
        try:
            ip = p[IP]
            tcp = p[TCP]
            self.assertEqual(ip.src, self.pg1.remote_ip4)
            self.assertEqual(ip.dst, host1.ip4)
            self.assertEqual(tcp.dport, port_in)
            self.assertEqual(tcp.sport, external_port)
        except:
            self.logger.error(ppp("Unexpected or invalid packet", p))
            raise

        # session close api test
        self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
                                             port_out1,
                                             self.pg1.remote_ip4n,
                                             external_port)
        dms = self.vapi.snat_det_map_dump()
        self.assertEqual(dms[0].ses_num, 1)

        self.vapi.snat_det_close_session_in(host0.ip4n,
                                            port_in,
                                            self.pg1.remote_ip4n,
                                            external_port)
        dms = self.vapi.snat_det_map_dump()
        self.assertEqual(dms[0].ses_num, 0)

    def test_tcp_session_close_detection_in(self):
        """ CGNAT TCP session close initiated from inside network """
        self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
                                   32,
                                   socket.inet_aton(self.snat_addr),
                                   32)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        self.initiate_tcp_session(self.pg0, self.pg1)

        # close the session from inside
        try:
            # FIN packet in -> out
            p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                     flags="F"))
            self.pg0.add_stream(p)
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.pg1.get_capture(1)

            pkts = []

            # ACK packet out -> in
            p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                     flags="A"))
            pkts.append(p)

            # FIN packet out -> in
            p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                     flags="F"))
            pkts.append(p)

            self.pg1.add_stream(pkts)
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.pg0.get_capture(2)

            # ACK packet in -> out
            p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                     flags="A"))
            self.pg0.add_stream(p)
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.pg1.get_capture(1)

            # Check if snat closed the session
            dms = self.vapi.snat_det_map_dump()
            self.assertEqual(0, dms[0].ses_num)
        except:
            self.logger.error("TCP session termination failed")
            raise

    def test_tcp_session_close_detection_out(self):
        """ CGNAT TCP session close initiated from outside network """
        self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
                                   32,
                                   socket.inet_aton(self.snat_addr),
                                   32)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        self.initiate_tcp_session(self.pg0, self.pg1)

        # close the session from outside
        try:
            # FIN packet out -> in
            p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                     flags="F"))
            self.pg1.add_stream(p)
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.pg0.get_capture(1)

            pkts = []

            # ACK packet in -> out
            p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                     flags="A"))
            pkts.append(p)

            # ACK packet in -> out
            p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
                 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
                     flags="F"))
            pkts.append(p)

            self.pg0.add_stream(pkts)
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.pg1.get_capture(2)

            # ACK packet out -> in
            p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
                 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
                     flags="A"))
            self.pg1.add_stream(p)
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.pg0.get_capture(1)

            # Check if snat closed the session
            dms = self.vapi.snat_det_map_dump()
            self.assertEqual(0, dms[0].ses_num)
        except:
            self.logger.error("TCP session termination failed")
            raise

    @unittest.skipUnless(running_extended_tests(), "part of extended tests")
    def test_session_timeout(self):
        """ CGNAT session timeouts """
        self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
                                   32,
                                   socket.inet_aton(self.snat_addr),
                                   32)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)

        self.initiate_tcp_session(self.pg0, self.pg1)
        self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
        pkts = self.create_stream_in(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        sleep(15)

        dms = self.vapi.snat_det_map_dump()
        self.assertEqual(0, dms[0].ses_num)

    @unittest.skipUnless(running_extended_tests(), "part of extended tests")
    def test_session_limit_per_user(self):
        """ CGNAT maximum 1000 sessions per user should be created """
        self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
                                   32,
                                   socket.inet_aton(self.snat_addr),
                                   32)
        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                 is_inside=0)
        self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
                                     src_address=self.pg2.local_ip4n,
                                     path_mtu=512,
                                     template_interval=10)
        self.vapi.snat_ipfix()

        pkts = []
        for port in range(1025, 2025):
            p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
                 UDP(sport=port, dport=port))
            pkts.append(p)

        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))

        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
             UDP(sport=3001, dport=3002))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.assert_nothing_captured()

        # verify ICMP error packet
        capture = self.pg0.get_capture(1)
        p = capture[0]
        self.assertTrue(p.haslayer(ICMP))
        icmp = p[ICMP]
        self.assertEqual(icmp.type, 3)
        self.assertEqual(icmp.code, 1)
        self.assertTrue(icmp.haslayer(IPerror))
        inner_ip = icmp[IPerror]
        self.assertEqual(inner_ip[UDPerror].sport, 3001)
        self.assertEqual(inner_ip[UDPerror].dport, 3002)

        dms = self.vapi.snat_det_map_dump()

        self.assertEqual(1000, dms[0].ses_num)

        # verify IPFIX logging
        self.vapi.cli("ipfix flush")  # FIXME this should be an API call
        sleep(1)
        capture = self.pg2.get_capture(2)
        ipfix = IPFIXDecoder()
        # first load template
        for p in capture:
            self.assertTrue(p.haslayer(IPFIX))
            if p.haslayer(Template):
                ipfix.add_template(p.getlayer(Template))
        # verify events in data set
        for p in capture:
            if p.haslayer(Data):
                data = ipfix.decode_data_set(p.getlayer(Set))
                self.verify_ipfix_max_entries_per_user(data)

    def clear_snat(self):
        """
        Clear SNAT configuration.
        """
        self.vapi.snat_ipfix(enable=0)
        self.vapi.snat_det_set_timeouts()
        deterministic_mappings = self.vapi.snat_det_map_dump()
        for dsm in deterministic_mappings:
            self.vapi.snat_add_det_map(dsm.in_addr,
                                       dsm.in_plen,
                                       dsm.out_addr,
                                       dsm.out_plen,
                                       is_add=0)

        interfaces = self.vapi.snat_interface_dump()
        for intf in interfaces:
            self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
                                                     intf.is_inside,
                                                     is_add=0)

    def tearDown(self):
        super(TestDeterministicNAT, self).tearDown()
        if not self.vpp_dead:
            self.logger.info(self.vapi.cli("show snat detail"))
            self.clear_snat()


class TestNAT64(MethodHolder):
    """ NAT64 Test Cases """

    @classmethod
    def setUpClass(cls):
        super(TestNAT64, cls).setUpClass()

        try:
            cls.tcp_port_in = 6303
            cls.tcp_port_out = 6303
            cls.udp_port_in = 6304
            cls.udp_port_out = 6304
            cls.icmp_id_in = 6305
            cls.icmp_id_out = 6305
            cls.nat_addr = '10.0.0.3'
            cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
            cls.vrf1_id = 10
            cls.vrf1_nat_addr = '10.0.10.3'
            cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
                                                   cls.vrf1_nat_addr)

            cls.create_pg_interfaces(range(3))
            cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
            cls.ip6_interfaces.append(cls.pg_interfaces[2])
            cls.ip4_interfaces = list(cls.pg_interfaces[1:2])

            cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)

            cls.pg0.generate_remote_hosts(2)

            for i in cls.ip6_interfaces:
                i.admin_up()
                i.config_ip6()
                i.configure_ipv6_neighbors()

            for i in cls.ip4_interfaces:
                i.admin_up()
                i.config_ip4()
                i.resolve_arp()

        except Exception:
            super(TestNAT64, cls).tearDownClass()
            raise

    def test_pool(self):
        """ Add/delete address to NAT64 pool """
        nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')

        self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)

        addresses = self.vapi.nat64_pool_addr_dump()
        self.assertEqual(len(addresses), 1)
        self.assertEqual(addresses[0].address, nat_addr)

        self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)

        addresses = self.vapi.nat64_pool_addr_dump()
        self.assertEqual(len(addresses), 0)

    def test_interface(self):
        """ Enable/disable NAT64 feature on the interface """
        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)

        interfaces = self.vapi.nat64_interface_dump()
        self.assertEqual(len(interfaces), 2)
        pg0_found = False
        pg1_found = False
        for intf in interfaces:
            if intf.sw_if_index == self.pg0.sw_if_index:
                self.assertEqual(intf.is_inside, 1)
                pg0_found = True
            elif intf.sw_if_index == self.pg1.sw_if_index:
                self.assertEqual(intf.is_inside, 0)
                pg1_found = True
        self.assertTrue(pg0_found)
        self.assertTrue(pg1_found)

        features = self.vapi.cli("show interface features pg0")
        self.assertNotEqual(features.find('nat64-in2out'), -1)
        features = self.vapi.cli("show interface features pg1")
        self.assertNotEqual(features.find('nat64-out2in'), -1)

        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)

        interfaces = self.vapi.nat64_interface_dump()
        self.assertEqual(len(interfaces), 0)

    def test_static_bib(self):
        """ Add/delete static BIB entry """
        in_addr = socket.inet_pton(socket.AF_INET6,
                                   '2001:db8:85a3::8a2e:370:7334')
        out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
        in_port = 1234
        out_port = 5678
        proto = IP_PROTOS.tcp

        self.vapi.nat64_add_del_static_bib(in_addr,
                                           out_addr,
                                           in_port,
                                           out_port,
                                           proto)
        bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
        static_bib_num = 0
        for bibe in bib:
            if bibe.is_static:
                static_bib_num += 1
                self.assertEqual(bibe.i_addr, in_addr)
                self.assertEqual(bibe.o_addr, out_addr)
                self.assertEqual(bibe.i_port, in_port)
                self.assertEqual(bibe.o_port, out_port)
        self.assertEqual(static_bib_num, 1)

        self.vapi.nat64_add_del_static_bib(in_addr,
                                           out_addr,
                                           in_port,
                                           out_port,
                                           proto,
                                           is_add=0)
        bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
        static_bib_num = 0
        for bibe in bib:
            if bibe.is_static:
                static_bib_num += 1
        self.assertEqual(static_bib_num, 0)

    def test_set_timeouts(self):
        """ Set NAT64 timeouts """
        # verify default values
        timeouts = self.vapi.nat64_get_timeouts()
        self.assertEqual(timeouts.udp, 300)
        self.assertEqual(timeouts.icmp, 60)
        self.assertEqual(timeouts.tcp_trans, 240)
        self.assertEqual(timeouts.tcp_est, 7440)
        self.assertEqual(timeouts.tcp_incoming_syn, 6)

        # set and verify custom values
        self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
                                     tcp_est=7450, tcp_incoming_syn=10)
        timeouts = self.vapi.nat64_get_timeouts()
        self.assertEqual(timeouts.udp, 200)
        self.assertEqual(timeouts.icmp, 30)
        self.assertEqual(timeouts.tcp_trans, 250)
        self.assertEqual(timeouts.tcp_est, 7450)
        self.assertEqual(timeouts.tcp_incoming_syn, 10)

    def test_dynamic(self):
        """ NAT64 dynamic translation test """
        self.tcp_port_in = 6303
        self.udp_port_in = 6304
        self.icmp_id_in = 6305

        ses_num_start = self.nat64_get_ses_num()

        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
                                                self.nat_addr_n)
        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)

        # in2out
        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip=self.nat_addr,
                                dst_ip=self.pg1.remote_ip4)

        # out2in
        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)

        # in2out
        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip=self.nat_addr,
                                dst_ip=self.pg1.remote_ip4)

        # out2in
        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)

        ses_num_end = self.nat64_get_ses_num()

        self.assertEqual(ses_num_end - ses_num_start, 3)

        # tenant with specific VRF
        self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
                                                self.vrf1_nat_addr_n,
                                                vrf_id=self.vrf1_id)
        self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)

        pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
        self.pg2.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
                                dst_ip=self.pg1.remote_ip4)

        pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg2.get_capture(len(pkts))
        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)

    def test_static(self):
        """ NAT64 static translation test """
        self.tcp_port_in = 60303
        self.udp_port_in = 60304
        self.icmp_id_in = 60305
        self.tcp_port_out = 60303
        self.udp_port_out = 60304
        self.icmp_id_out = 60305

        ses_num_start = self.nat64_get_ses_num()

        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
                                                self.nat_addr_n)
        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)

        self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
                                           self.nat_addr_n,
                                           self.tcp_port_in,
                                           self.tcp_port_out,
                                           IP_PROTOS.tcp)
        self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
                                           self.nat_addr_n,
                                           self.udp_port_in,
                                           self.udp_port_out,
                                           IP_PROTOS.udp)
        self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
                                           self.nat_addr_n,
                                           self.icmp_id_in,
                                           self.icmp_id_out,
                                           IP_PROTOS.icmp)

        # in2out
        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip=self.nat_addr,
                                dst_ip=self.pg1.remote_ip4, same_port=True)

        # out2in
        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
        self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)

        ses_num_end = self.nat64_get_ses_num()

        self.assertEqual(ses_num_end - ses_num_start, 3)

    @unittest.skipUnless(running_extended_tests(), "part of extended tests")
    def test_session_timeout(self):
        """ NAT64 session timeout """
        self.icmp_id_in = 1234
        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
                                                self.nat_addr_n)
        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
        self.vapi.nat64_set_timeouts(icmp=5)

        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))

        ses_num_before_timeout = self.nat64_get_ses_num()

        sleep(15)

        # ICMP session after timeout
        ses_num_after_timeout = self.nat64_get_ses_num()
        self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)

    def test_icmp_error(self):
        """ NAT64 ICMP Error message translation """
        self.tcp_port_in = 6303
        self.udp_port_in = 6304
        self.icmp_id_in = 6305

        ses_num_start = self.nat64_get_ses_num()

        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
                                                self.nat_addr_n)
        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)

        # send some packets to create sessions
        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture_ip4 = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture_ip4,
                                nat_ip=self.nat_addr,
                                dst_ip=self.pg1.remote_ip4)

        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture_ip6 = self.pg0.get_capture(len(pkts))
        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
        self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
                                   self.pg0.remote_ip6)

        # in2out
        pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
                IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
                ICMPv6DestUnreach(code=1) /
                packet[IPv6] for packet in capture_ip6]
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        for packet in capture:
            try:
                self.assertEqual(packet[IP].src, self.nat_addr)
                self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
                self.assertEqual(packet[ICMP].type, 3)
                self.assertEqual(packet[ICMP].code, 13)
                inner = packet[IPerror]
                self.assertEqual(inner.src, self.pg1.remote_ip4)
                self.assertEqual(inner.dst, self.nat_addr)
                self.check_icmp_checksum(packet)
                if inner.haslayer(TCPerror):
                    self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
                elif inner.haslayer(UDPerror):
                    self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
                else:
                    self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
            except:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise

        # out2in
        pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
                IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
                ICMP(type=3, code=13) /
                packet[IP] for packet in capture_ip4]
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        for packet in capture:
            try:
                self.assertEqual(packet[IPv6].src, ip.src)
                self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
                icmp = packet[ICMPv6DestUnreach]
                self.assertEqual(icmp.code, 1)
                inner = icmp[IPerror6]
                self.assertEqual(inner.src, self.pg0.remote_ip6)
                self.assertEqual(inner.dst, ip.src)
                self.check_icmpv6_checksum(packet)
                if inner.haslayer(TCPerror):
                    self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
                elif inner.haslayer(UDPerror):
                    self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
                else:
                    self.assertEqual(inner[ICMPv6EchoRequest].id,
                                     self.icmp_id_in)
            except:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise

    def test_hairpinning(self):
        """ NAT64 hairpinning """

        client = self.pg0.remote_hosts[0]
        server = self.pg0.remote_hosts[1]
        server_tcp_in_port = 22
        server_tcp_out_port = 4022
        server_udp_in_port = 23
        server_udp_out_port = 4023
        client_tcp_in_port = 1234
        client_udp_in_port = 1235
        client_tcp_out_port = 0
        client_udp_out_port = 0
        ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
        nat_addr_ip6 = ip.src

        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
                                                self.nat_addr_n)
        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)

        self.vapi.nat64_add_del_static_bib(server.ip6n,
                                           self.nat_addr_n,
                                           server_tcp_in_port,
                                           server_tcp_out_port,
                                           IP_PROTOS.tcp)
        self.vapi.nat64_add_del_static_bib(server.ip6n,
                                           self.nat_addr_n,
                                           server_udp_in_port,
                                           server_udp_out_port,
                                           IP_PROTOS.udp)

        # client to server
        pkts = []
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IPv6(src=client.ip6, dst=nat_addr_ip6) /
             TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IPv6(src=client.ip6, dst=nat_addr_ip6) /
             UDP(sport=client_udp_in_port, dport=server_udp_out_port))
        pkts.append(p)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        for packet in capture:
            try:
                self.assertEqual(packet[IPv6].src, nat_addr_ip6)
                self.assertEqual(packet[IPv6].dst, server.ip6)
                if packet.haslayer(TCP):
                    self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
                    self.assertEqual(packet[TCP].dport, server_tcp_in_port)
                    self.check_tcp_checksum(packet)
                    client_tcp_out_port = packet[TCP].sport
                else:
                    self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
                    self.assertEqual(packet[UDP].dport, server_udp_in_port)
                    self.check_udp_checksum(packet)
                    client_udp_out_port = packet[UDP].sport
            except:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise

        # server to client
        pkts = []
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IPv6(src=server.ip6, dst=nat_addr_ip6) /
             TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
        pkts.append(p)
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IPv6(src=server.ip6, dst=nat_addr_ip6) /
             UDP(sport=server_udp_in_port, dport=client_udp_out_port))
        pkts.append(p)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        for packet in capture:
            try:
                self.assertEqual(packet[IPv6].src, nat_addr_ip6)
                self.assertEqual(packet[IPv6].dst, client.ip6)
                if packet.haslayer(TCP):
                    self.assertEqual(packet[TCP].sport, server_tcp_out_port)
                    self.assertEqual(packet[TCP].dport, client_tcp_in_port)
                    self.check_tcp_checksum(packet)
                else:
                    self.assertEqual(packet[UDP].sport, server_udp_out_port)
                    self.assertEqual(packet[UDP].dport, client_udp_in_port)
                    self.check_udp_checksum(packet)
            except:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise

        # ICMP error
        pkts = []
        pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
                IPv6(src=client.ip6, dst=nat_addr_ip6) /
                ICMPv6DestUnreach(code=1) /
                packet[IPv6] for packet in capture]
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        for packet in capture:
            try:
                self.assertEqual(packet[IPv6].src, nat_addr_ip6)
                self.assertEqual(packet[IPv6].dst, server.ip6)
                icmp = packet[ICMPv6DestUnreach]
                self.assertEqual(icmp.code, 1)
                inner = icmp[IPerror6]
                self.assertEqual(inner.src, server.ip6)
                self.assertEqual(inner.dst, nat_addr_ip6)
                self.check_icmpv6_checksum(packet)
                if inner.haslayer(TCPerror):
                    self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
                    self.assertEqual(inner[TCPerror].dport,
                                     client_tcp_out_port)
                else:
                    self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
                    self.assertEqual(inner[UDPerror].dport,
                                     client_udp_out_port)
            except:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise

    def test_prefix(self):
        """ NAT64 Network-Specific Prefix """

        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
                                                self.nat_addr_n)
        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
        self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
                                                self.vrf1_nat_addr_n,
                                                vrf_id=self.vrf1_id)
        self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)

        # Add global prefix
        global_pref64 = "2001:db8::"
        global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
        global_pref64_len = 32
        self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)

        prefix = self.vapi.nat64_prefix_dump()
        self.assertEqual(len(prefix), 1)
        self.assertEqual(prefix[0].prefix, global_pref64_n)
        self.assertEqual(prefix[0].prefix_len, global_pref64_len)
        self.assertEqual(prefix[0].vrf_id, 0)

        # Add tenant specific prefix
        vrf1_pref64 = "2001:db8:122:300::"
        vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
        vrf1_pref64_len = 56
        self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
                                       vrf1_pref64_len,
                                       vrf_id=self.vrf1_id)
        prefix = self.vapi.nat64_prefix_dump()
        self.assertEqual(len(prefix), 2)

        # Global prefix
        pkts = self.create_stream_in_ip6(self.pg0,
                                         self.pg1,
                                         pref=global_pref64,
                                         plen=global_pref64_len)
        self.pg0.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip=self.nat_addr,
                                dst_ip=self.pg1.remote_ip4)

        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(len(pkts))
        dst_ip = self.compose_ip6(self.pg1.remote_ip4,
                                  global_pref64,
                                  global_pref64_len)
        self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)

        # Tenant specific prefix
        pkts = self.create_stream_in_ip6(self.pg2,
                                         self.pg1,
                                         pref=vrf1_pref64,
                                         plen=vrf1_pref64_len)
        self.pg2.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg1.get_capture(len(pkts))
        self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
                                dst_ip=self.pg1.remote_ip4)

        pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
        self.pg1.add_stream(pkts)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg2.get_capture(len(pkts))
        dst_ip = self.compose_ip6(self.pg1.remote_ip4,
                                  vrf1_pref64,
                                  vrf1_pref64_len)
        self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)

    def _test_unknown_proto(self):
        """ NAT64 translate packet with unknown protocol """

        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
                                                self.nat_addr_n)
        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
        remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)

        # in2out
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
             TCP(sport=self.tcp_port_in, dport=20))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg1.get_capture(1)

        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
             GRE() /
             IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg1.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IP].src, self.nat_addr)
            self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
            self.assertTrue(packet.haslayer(GRE))
            self.check_ip_checksum(packet)
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

        # out2in
        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
             IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
             GRE() /
             IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg1.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IPv6].src, remote_ip6)
            self.assertEqual(packet[IPv6].dst, self.pgi0.remote_ip6)
            self.assertTrue(packet.haslayer(GRE))
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

    def _test_hairpinning_unknown_proto(self):
        """ NAT64 translate packet with unknown protocol - hairpinning """

        client = self.pg0.remote_hosts[0]
        server = self.pg0.remote_hosts[1]
        server_tcp_in_port = 22
        server_tcp_out_port = 4022
        client_tcp_in_port = 1234
        client_udp_in_port = 1235
        nat_addr_ip6 = self.compose_ip6(self.nat_addr, '64:ff9b::', 96)

        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
                                                self.nat_addr_n)
        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)

        self.vapi.nat64_add_del_static_bib(server.ip6n,
                                           self.nat_addr_n,
                                           server_tcp_in_port,
                                           server_tcp_out_port,
                                           IP_PROTOS.tcp)

        # client to server
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IPv6(src=client.ip6, dst=nat_addr_ip6) /
             TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)

        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IPv6(src=client.ip6, dst=nat_addr_ip6) /
             GRE() /
             IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IPv6].src, nat_addr_ip6)
            self.assertEqual(packet[IPv6].dst, server.ip6)
            self.assertTrue(packet.haslayer(GRE))
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

        # server to client
        p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
             IPv6(src=server.ip6, dst=nat_addr_ip6) /
             GRE() /
             IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
             TCP(sport=1234, dport=1234))
        self.pg0.add_stream(p)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        p = self.pg0.get_capture(1)
        packet = p[0]
        try:
            self.assertEqual(packet[IPv6].src, nat_addr_ip6)
            self.assertEqual(packet[IPv6].dst, client.ip6)
            self.assertTrue(packet.haslayer(GRE))
        except:
            self.logger.error(ppp("Unexpected or invalid packet:", packet))
            raise

    def nat64_get_ses_num(self):
        """
        Return number of active NAT64 sessions.
        """
        ses_num = 0
        st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
        ses_num += len(st)
        st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
        ses_num += len(st)
        st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
        ses_num += len(st)
        return ses_num

    def clear_nat64(self):
        """
        Clear NAT64 configuration.
        """
        self.vapi.nat64_set_timeouts()

        interfaces = self.vapi.nat64_interface_dump()
        for intf in interfaces:
            self.vapi.nat64_add_del_interface(intf.sw_if_index,
                                              intf.is_inside,
                                              is_add=0)

        bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
        for bibe in bib:
            if bibe.is_static:
                self.vapi.nat64_add_del_static_bib(bibe.i_addr,
                                                   bibe.o_addr,
                                                   bibe.i_port,
                                                   bibe.o_port,
                                                   bibe.proto,
                                                   bibe.vrf_id,
                                                   is_add=0)

        bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
        for bibe in bib:
            if bibe.is_static:
                self.vapi.nat64_add_del_static_bib(bibe.i_addr,
                                                   bibe.o_addr,
                                                   bibe.i_port,
                                                   bibe.o_port,
                                                   bibe.proto,
                                                   bibe.vrf_id,
                                                   is_add=0)

        bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
        for bibe in bib:
            if bibe.is_static:
                self.vapi.nat64_add_del_static_bib(bibe.i_addr,
                                                   bibe.o_addr,
                                                   bibe.i_port,
                                                   bibe.o_port,
                                                   bibe.proto,
                                                   bibe.vrf_id,
                                                   is_add=0)

        adresses = self.vapi.nat64_pool_addr_dump()
        for addr in adresses:
            self.vapi.nat64_add_del_pool_addr_range(addr.address,
                                                    addr.address,
                                                    vrf_id=addr.vrf_id,
                                                    is_add=0)

        prefixes = self.vapi.nat64_prefix_dump()
        for prefix in prefixes:
            self.vapi.nat64_add_del_prefix(prefix.prefix,
                                           prefix.prefix_len,
                                           vrf_id=prefix.vrf_id,
                                           is_add=0)

    def tearDown(self):
        super(TestNAT64, self).tearDown()
        if not self.vpp_dead:
            self.logger.info(self.vapi.cli("show nat64 pool"))
            self.logger.info(self.vapi.cli("show nat64 interfaces"))
            self.logger.info(self.vapi.cli("show nat64 prefix"))
            self.logger.info(self.vapi.cli("show nat64 bib tcp"))
            self.logger.info(self.vapi.cli("show nat64 bib udp"))
            self.logger.info(self.vapi.cli("show nat64 bib icmp"))
            self.logger.info(self.vapi.cli("show nat64 session table tcp"))
            self.logger.info(self.vapi.cli("show nat64 session table udp"))
            self.logger.info(self.vapi.cli("show nat64 session table icmp"))
            self.clear_nat64()

if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)