diff options
Diffstat (limited to 'test/template_ipsec.py')
-rw-r--r-- | test/template_ipsec.py | 213 |
1 files changed, 204 insertions, 9 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py index fd9d91f3cfb..57c84342cb7 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -324,6 +324,199 @@ class IpsecTra4(object): return count + def verify_hi_seq_num(self): + p = self.params[socket.AF_INET] + saf = VppEnum.vl_api_ipsec_sad_flags_t + esn_on = p.vpp_tra_sa.esn_en + ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY + + seq_cycle_node_name = \ + ('/err/%s/sequence number cycled (packet dropped)' % + self.tra4_encrypt_node_name) + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + + # a few packets so we get the rx seq number above the window size and + # thus can simulate a wrap with an out of window packet + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=seq)) + for seq in range(63, 80)] + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # these 4 packets will all choose seq-num 0 to decrpyt since none + # are out of window when first checked. however, once #200 has + # decrypted it will move the window to 200 and has #81 is out of + # window. this packet should be dropped. + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=200)), + (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=81)), + (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=201)), + (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=202))] + + # if anti-replay is off then we won't drop #81 + n_rx = 3 if ar_on else 4 + self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx) + # this packet is one before the wrap + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=203))] + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # move the window over half way to a wrap + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x80000001))] + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # anti-replay will drop old packets, no anti-replay will not + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x44000001))] + + if ar_on: + self.send_and_assert_no_replies(self.tra_if, pkts) + else: + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + if esn_on: + # + # validate wrapping the ESN + # + + # wrap scapy's TX SA SN + p.scapy_tra_sa.seq_num = 0x100000005 + + # send a packet that wraps the window for both AR and no AR + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x100000005))] + + rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if) + for rx in rxs: + decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) + + # move the window forward to half way to the next wrap + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x180000005))] + + rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # a packet less than 2^30 from the current position is: + # - AR: out of window and dropped + # - non-AR: accepted + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x170000005))] + + if ar_on: + self.send_and_assert_no_replies(self.tra_if, pkts) + else: + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # a packet more than 2^30 from the current position is: + # - AR: out of window and dropped + # - non-AR: considered a wrap, but since it's not a wrap + # it won't decrpyt and so will be dropped + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x130000005))] + + self.send_and_assert_no_replies(self.tra_if, pkts) + + # a packet less than 2^30 from the current position and is a + # wrap; (the seq is currently at 0x180000005). + # - AR: out of window so considered a wrap, so accepted + # - non-AR: not considered a wrap, so won't decrypt + p.scapy_tra_sa.seq_num = 0x260000005 + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x260000005))] + if ar_on: + self.send_and_expect(self.tra_if, pkts, self.tra_if) + else: + self.send_and_assert_no_replies(self.tra_if, pkts) + + # + # window positions are different now for AR/non-AR + # move non-AR forward + # + if not ar_on: + # a packet more than 2^30 from the current position and is a + # wrap; (the seq is currently at 0x180000005). + # - AR: accepted + # - non-AR: not considered a wrap, so won't decrypt + + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x200000005)), + (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x200000006))] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x260000005))] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + def verify_tra_anti_replay(self): p = self.params[socket.AF_INET] esn_en = p.vpp_tra_sa.esn_en @@ -359,7 +552,7 @@ class IpsecTra4(object): recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) # replayed packets are dropped - self.send_and_assert_no_replies(self.tra_if, pkts) + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) replay_count += len(pkts) self.assertEqual(self.get_replay_counts(p), replay_count) @@ -393,7 +586,7 @@ class IpsecTra4(object): recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) # replayed packets are dropped - self.send_and_assert_no_replies(self.tra_if, pkt * 3) + self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2) replay_count += 3 self.assertEqual(self.get_replay_counts(p), replay_count) @@ -420,7 +613,7 @@ class IpsecTra4(object): dst=self.tra_if.local_ip4) / ICMP(), seq_num=350)) - self.send_and_assert_no_replies(self.tra_if, pkt * 17) + self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) @@ -436,7 +629,7 @@ class IpsecTra4(object): dst=self.tra_if.local_ip4) / ICMP(), seq_num=350)) - self.send_and_assert_no_replies(self.tra_if, pkt * 17) + self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) undersize_count += 17 self.assert_error_counter_equal(undersize_node_name, @@ -462,7 +655,7 @@ class IpsecTra4(object): dst=self.tra_if.local_ip4) / ICMP(), seq_num=17)) - self.send_and_assert_no_replies(self.tra_if, pkt * 17) + self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) if esn_en: # an out of window error with ESN looks like a high sequence @@ -526,6 +719,7 @@ class IpsecTra4(object): ICMP(), seq_num=0x100000005)) rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) + decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) # @@ -544,7 +738,7 @@ class IpsecTra4(object): # # While in case A we cannot wrap the high sequence number again - # becuase VPP will consider this packet to be one that moves the + # because VPP will consider this packet to be one that moves the # window forward # pkt = (Ether(src=self.tra_if.remote_mac, @@ -553,13 +747,14 @@ class IpsecTra4(object): dst=self.tra_if.local_ip4) / ICMP(), seq_num=0x200000999)) - self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if) + self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if, + timeout=0.2) hash_failed_count += 1 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) # - # but if we move the wondow forward to case B, then we can wrap + # but if we move the window forward to case B, then we can wrap # again # p.scapy_tra_sa.seq_num = 0x100000555 @@ -587,7 +782,7 @@ class IpsecTra4(object): # without ESN TX sequence numbers can't wrap and packets are # dropped from here on out. # - self.send_and_assert_no_replies(self.tra_if, pkts) + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) seq_cycle_count += len(pkts) self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count) |