diff options
author | Maxime Peim <mpeim@cisco.com> | 2022-12-22 11:26:57 +0000 |
---|---|---|
committer | Beno�t Ganne <bganne@cisco.com> | 2023-10-30 15:23:13 +0000 |
commit | 0e2f188f7c9872d7c946c14d785c6dc7c7c68847 (patch) | |
tree | 1adc39db5e2e0e243811c8ce001d0bd056c0402e /test | |
parent | 21922cec7339f48989f230248de36a98816c4b1b (diff) |
ipsec: huge anti-replay window support
Type: improvement
Since RFC4303 does not specify the anti-replay window size, VPP should
support multiple window size. It is done through a clib_bitmap.
Signed-off-by: Maxime Peim <mpeim@cisco.com>
Change-Id: I3dfe30efd20018e345418bef298ec7cec19b1cfc
Diffstat (limited to 'test')
-rw-r--r-- | test/template_ipsec.py | 823 | ||||
-rw-r--r-- | test/test_ipsec_api.py | 2 | ||||
-rw-r--r-- | test/test_ipsec_esp.py | 22 | ||||
-rw-r--r-- | test/vpp_ipsec.py | 7 |
4 files changed, 827 insertions, 27 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 5d835104ec4..c438fbb7680 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -51,6 +51,8 @@ class IPsecIPv4Params: self.outer_flow_label = 0 self.inner_flow_label = 0x12345 + self.anti_replay_window_size = 64 + self.auth_algo_vpp_id = ( VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96 ) @@ -98,6 +100,8 @@ class IPsecIPv6Params: self.outer_flow_label = 0 self.inner_flow_label = 0x12345 + self.anti_replay_window_size = 64 + self.auth_algo_vpp_id = ( VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96 ) @@ -625,22 +629,34 @@ class IpsecTra4(object): def verify_tra_anti_replay(self): p = self.params[socket.AF_INET] esn_en = p.vpp_tra_sa.esn_en + anti_replay_window_size = p.anti_replay_window_size seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name replay_count = self.get_replay_counts(p) + initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay") hash_failed_count = self.get_hash_failed_counts(p) seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err( + "seq_cycled" + ) hash_err = "integ_error" if ESP == self.encryption_type: undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] undersize_count = self.statistics.get_err_counter(undersize_node_name) + initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err( + "runt" + ) # For AES-GCM an error in the hash is reported as a decryption failure if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"): hash_err = "decryption_failed" # In async mode, we don't report errors in the hash. if p.async_mode: hash_err = "" + else: + initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err( + hash_err + ) # # send packets with seq numbers 1->34 @@ -666,7 +682,7 @@ class IpsecTra4(object): self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) replay_count += len(pkts) self.assertEqual(self.get_replay_counts(p), replay_count) - err = p.tra_sa_in.get_err("replay") + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff self.assertEqual(err, replay_count) # @@ -684,29 +700,31 @@ class IpsecTra4(object): recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1) replay_count += 7 self.assertEqual(self.get_replay_counts(p), replay_count) - err = p.tra_sa_in.get_err("replay") + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff self.assertEqual(err, replay_count) # - # now move the window over to 257 (more than one byte) and into Case A + # now move the window over to anti_replay_window_size + 100 and into Case A # self.vapi.cli("clear error") pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=257, + seq_num=anti_replay_window_size + 100, ) recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) + self.logger.info(self.vapi.ppcli("show ipsec sa 1")) + # replayed packets are dropped self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2) replay_count += 3 self.assertEqual(self.get_replay_counts(p), replay_count) - err = p.tra_sa_in.get_err("replay") + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff self.assertEqual(err, replay_count) - # the window size is 64 packets + # the window size is anti_replay_window_size packets # in window are still accepted pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac @@ -714,7 +732,6 @@ class IpsecTra4(object): IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), seq_num=200, ) - recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) # a packet that does not decrypt does not move the window forward bogus_sa = SecurityAssociation( @@ -729,14 +746,14 @@ class IpsecTra4(object): src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / bogus_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=350, + seq_num=anti_replay_window_size + 200, ) self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) if hash_err != "": - err = p.tra_sa_in.get_err(hash_err) + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff self.assertEqual(err, hash_failed_count) # a malformed 'runt' packet @@ -747,13 +764,13 @@ class IpsecTra4(object): src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / bogus_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=350, + seq_num=anti_replay_window_size + 200, ) self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) undersize_count += 17 self.assert_error_counter_equal(undersize_node_name, undersize_count) - err = p.tra_sa_in.get_err("runt") + err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff self.assertEqual(err, undersize_count) # which we can determine since this packet is still in the window @@ -784,21 +801,21 @@ class IpsecTra4(object): hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) if hash_err != "": - err = p.tra_sa_in.get_err(hash_err) + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff self.assertEqual(err, hash_failed_count) else: replay_count += 17 self.assertEqual(self.get_replay_counts(p), replay_count) - err = p.tra_sa_in.get_err("replay") + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff self.assertEqual(err, replay_count) - # valid packet moves the window over to 258 + # valid packet moves the window over to anti_replay_window_size + 258 pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=258, + seq_num=anti_replay_window_size + 258, ) rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) @@ -852,7 +869,7 @@ class IpsecTra4(object): decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) # - # A packet that has seq num between (2^32-64) and 5 is within + # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within # the window # p.scapy_tra_sa.seq_num = 0xFFFFFFFD @@ -883,19 +900,19 @@ class IpsecTra4(object): hash_failed_count += 1 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) if hash_err != "": - err = p.tra_sa_in.get_err(hash_err) + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff self.assertEqual(err, hash_failed_count) # # but if we move the window forward to case B, then we can wrap # again # - p.scapy_tra_sa.seq_num = 0x100000555 + p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555 pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=0x100000555, + seq_num=p.scapy_tra_sa.seq_num, ) rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) @@ -918,7 +935,7 @@ class IpsecTra4(object): self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) seq_cycle_count += len(pkts) self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count) - err = p.tra_sa_out.get_err("seq_cycled") + err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff self.assertEqual(err, seq_cycle_count) # move the security-associations seq number on to the last we used @@ -1068,6 +1085,768 @@ class IpsecTra4(object): self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count) self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count) + def _verify_tra_anti_replay_algorithm_esn(self): + def seq_num(seqh, seql): + return (seqh << 32) | (seql & 0xFFFF_FFFF) + + p = self.params[socket.AF_INET] + anti_replay_window_size = p.anti_replay_window_size + + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + + if ESP == self.encryption_type: + undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] + undersize_count = self.statistics.get_err_counter(undersize_node_name) + + # reset the TX SA to avoid conflict with left configuration + self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0") + + """ + RFC 4303 Appendix A2. Case A + + |: new Th marker + a-i: possible seq num received + +: Bl, Tl, Bl', Tl' + [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1) + + Th - 1 Th Th + 1 + --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|-- + ========= ========= ========= + Bl- Tl- Bl Tl Bl+ Tl+ + + Case A implies Tl >= W - 1 + """ + + Th = 1 + Tl = anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + # move VPP's RX AR window to Case A + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case a: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5)) + ] + + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case b: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5)) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case c: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case d: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should fail + - post-crypto check: ... + """ + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5)) + ] + + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case e: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case f: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (the window stays Case A) + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case g: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (may set the window in Case B) + -> Seql is marked in the AR window + """ + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + # set the window in Case B (the minimum window size is 64 + # so we are sure to overlap) + for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # reset the VPP's RX AR window to Case A + Th = 1 + Tl = 2 * anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + # the AR will stay in Case A + for seq in range( + seq_num(Th + 1, anti_replay_window_size + 10), + seq_num(Th + 1, anti_replay_window_size + 20), + ) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case h: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: the wrap is not detected, should fail + - post-crypto check: ... + """ + Th += 1 + Tl = anti_replay_window_size + 20 + Bl = Tl - anti_replay_window_size + 1 + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case i: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: the wrap is not detected, shoud fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15)) + ] + + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + RFC 4303 Appendix A2. Case B + + Th - 1 Th Th + 1 + ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h--- + ========= =========== =========== + Tl- Bl Tl Bl+ Tl+ + + Case B implies Tl < W - 1 + """ + + # reset the VPP's RX AR window to Case B + Th = 2 + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = (Tl - anti_replay_window_size + 1) % (1 << 32) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case a: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 5), seq_num(Th, 10)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case b: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case c: Tl < Bl <= Seql + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th - 1 + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case d: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 15), seq_num(Th, 25)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case e: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (may set the window in Case A) + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15)) + ] + + # the window stays in Case B + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range( + seq_num(Th, Tl + anti_replay_window_size + 5), + seq_num(Th, Tl + anti_replay_window_size + 15), + ) + ] + + # the window moves to Case A + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # reset the VPP's RX AR window to Case B + Th = 2 + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = (Tl - anti_replay_window_size + 1) % (1 << 32) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case f: Tl < Bl <= Seql + - pre-crypto check: algorithm predicts that the packet is in the previous window + -> Seqh = Th - 1 + -> check for a replayed packet with Seql + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case g: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 10), seq_num(Th, 15)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case h: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + def _verify_tra_anti_replay_algorithm_no_esn(self): + def seq_num(seql): + return seql & 0xFFFF_FFFF + + p = self.params[socket.AF_INET] + anti_replay_window_size = p.anti_replay_window_size + + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + + if ESP == self.encryption_type: + undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] + undersize_count = self.statistics.get_err_counter(undersize_node_name) + + # reset the TX SA to avoid conflict with left configuration + self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0") + + """ + RFC 4303 Appendix A2. Case A + + a-c: possible seq num received + +: Bl, Tl + + |--a--+---b---+-c--| + ========= + Bl Tl + + No ESN implies Th = 0 + Case A implies Tl >= W - 1 + """ + + Tl = anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + # move VPP's RX AR window to Case A + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Tl) + + """ + case a: Seql < Bl + - pre-crypto check: algorithm predicts that the packet is out of window + -> packet should be dropped + - integrity check: ... + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Bl - 20), seq_num(Bl - 5)) + ] + + # out-of-window packets + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + replay_count += len(pkts) + self.assertEqual(self.get_replay_counts(p), replay_count) + + """ + case b: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: + -> check for a replayed packet with Seql + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Tl - 50), seq_num(Tl - 30)) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Tl - 35), seq_num(Tl - 30)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # replayed packets + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + """ + case c: Seql > Tl + - pre-crypto check: algorithm predicts that the packet will shift the window + - integrity check: should pass + - post-crypto check: should pass + -> AR window is shifted + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Tl + 5), seq_num(Tl + 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + RFC 4303 Appendix A2. Case B + + |-a-----+------b-----| + ========= + Tl + + Case B implies Tl < W - 1 + """ + + # reset the VPP's RX AR window to Case B + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = seq_num(Tl - anti_replay_window_size + 1) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}") + + """ + case a: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(5), seq_num(10)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case b: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + - integrity check: should pass + - post-crypto check: should pass + -> AR window is shifted + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(-50), seq_num(-20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + def verify_tra_anti_replay_algorithm(self): + if self.params[socket.AF_INET].vpp_tra_sa.esn_en: + self._verify_tra_anti_replay_algorithm_esn() + else: + self._verify_tra_anti_replay_algorithm_no_esn() + class IpsecTra4Tests(IpsecTra4): """UT test methods for Transport v4""" @@ -1076,6 +1855,10 @@ class IpsecTra4Tests(IpsecTra4): """ipsec v4 transport anti-replay test""" self.verify_tra_anti_replay() + def test_tra_anti_replay_algorithm(self): + """ipsec v4 transport anti-replay algorithm test""" + self.verify_tra_anti_replay_algorithm() + def test_tra_lost(self): """ipsec v4 transport lost packet test""" self.verify_tra_lost() diff --git a/test/test_ipsec_api.py b/test/test_ipsec_api.py index 521762b8181..6e246f681a9 100644 --- a/test/test_ipsec_api.py +++ b/test/test_ipsec_api.py @@ -121,7 +121,7 @@ class IpsecApiTestCase(VppTestCase): def __check_sa_binding(self, sa_id, thread_index): found_sa = False - sa_dumps = self.vapi.ipsec_sa_v4_dump() + sa_dumps = self.vapi.ipsec_sa_v5_dump() for dump in sa_dumps: if dump.entry.sad_id == sa_id: self.assertEqual(dump.thread_index, thread_index) diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py index 927863c80a1..fdd7eb8af15 100644 --- a/test/test_ipsec_esp.py +++ b/test/test_ipsec_esp.py @@ -62,10 +62,11 @@ class ConfigIpsecESP(TemplateIpsec): def tearDown(self): super(ConfigIpsecESP, self).tearDown() - def config_anti_replay(self, params): + def config_anti_replay(self, params, anti_replay_window_size=64): saf = VppEnum.vl_api_ipsec_sad_flags_t for p in params: p.flags |= saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY + p.anti_replay_window_size = anti_replay_window_size def config_network(self, params): self.net_objs = [] @@ -134,6 +135,7 @@ class ConfigIpsecESP(TemplateIpsec): flags = params.flags tun_flags = params.tun_flags salt = params.salt + anti_replay_window_size = params.anti_replay_window_size objs = [] params.tun_sa_in = VppIpsecSA( @@ -152,6 +154,7 @@ class ConfigIpsecESP(TemplateIpsec): flags=flags, salt=salt, hop_limit=params.outer_hop_limit, + anti_replay_window_size=anti_replay_window_size, ) params.tun_sa_out = VppIpsecSA( self, @@ -169,6 +172,7 @@ class ConfigIpsecESP(TemplateIpsec): flags=flags, salt=salt, hop_limit=params.outer_hop_limit, + anti_replay_window_size=anti_replay_window_size, ) objs.append(params.tun_sa_in) objs.append(params.tun_sa_out) @@ -274,6 +278,7 @@ class ConfigIpsecESP(TemplateIpsec): e = VppEnum.vl_api_ipsec_spd_action_t flags = params.flags salt = params.salt + anti_replay_window_size = params.anti_replay_window_size objs = [] params.tra_sa_in = VppIpsecSA( @@ -287,6 +292,7 @@ class ConfigIpsecESP(TemplateIpsec): self.vpp_esp_protocol, flags=flags, salt=salt, + anti_replay_window_size=anti_replay_window_size, ) params.tra_sa_out = VppIpsecSA( self, @@ -299,6 +305,7 @@ class ConfigIpsecESP(TemplateIpsec): self.vpp_esp_protocol, flags=flags, salt=salt, + anti_replay_window_size=anti_replay_window_size, ) objs.append(params.tra_sa_in) objs.append(params.tra_sa_out) @@ -1184,9 +1191,16 @@ class RunTestIpsecEspAll(ConfigIpsecESP, IpsecTra4, IpsecTra6, IpsecTun4, IpsecT # saf = VppEnum.vl_api_ipsec_sad_flags_t if flag & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY: - self.unconfig_network() - self.config_network(self.params.values()) - self.verify_tra_anti_replay() + for anti_replay_window_size in ( + 64, + 131072, + ): + self.unconfig_network() + self.config_anti_replay(self.params.values(), anti_replay_window_size) + self.config_network(self.params.values()) + self.verify_tra_anti_replay() + self.verify_tra_anti_replay_algorithm() + self.config_anti_replay(self.params.values()) self.unconfig_network() self.config_network(self.params.values()) diff --git a/test/vpp_ipsec.py b/test/vpp_ipsec.py index 7a5a95a457a..e354cfc8ac6 100644 --- a/test/vpp_ipsec.py +++ b/test/vpp_ipsec.py @@ -218,6 +218,7 @@ class VppIpsecSA(VppObject): udp_src=None, udp_dst=None, hop_limit=None, + anti_replay_window_size=0, ): e = VppEnum.vl_api_ipsec_sad_flags_t self.test = test @@ -229,6 +230,7 @@ class VppIpsecSA(VppObject): self.crypto_key = crypto_key self.proto = proto self.salt = salt + self.anti_replay_window_size = anti_replay_window_size self.table_id = 0 self.tun_src = tun_src @@ -284,13 +286,14 @@ class VppIpsecSA(VppObject): "tunnel": self.tunnel_encode(), "flags": self.flags, "salt": self.salt, + "anti_replay_window_size": self.anti_replay_window_size, } # don't explicitly send the defaults, let papi fill them in if self.udp_src: entry["udp_src_port"] = self.udp_src if self.udp_dst: entry["udp_dst_port"] = self.udp_dst - r = self.test.vapi.ipsec_sad_entry_add(entry=entry) + r = self.test.vapi.ipsec_sad_entry_add_v2(entry=entry) self.stat_index = r.stat_index self.test.registry.register(self, self.test.logger) return self @@ -324,7 +327,7 @@ class VppIpsecSA(VppObject): def query_vpp_config(self): e = VppEnum.vl_api_ipsec_sad_flags_t - bs = self.test.vapi.ipsec_sa_v3_dump() + bs = self.test.vapi.ipsec_sa_v5_dump() for b in bs: if b.entry.sad_id == self.id: # if udp encap is configured then the ports should match |