aboutsummaryrefslogtreecommitdiffstats
path: root/GPL/traffic_scripts/policer.py
blob: 1df3def011d75ea960d24d8350b1689c5f841ced (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3

# Copyright (c) 2020 Cisco and/or its affiliates.
#
# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
#
# Licensed under the Apache License 2.0 or
# GNU General Public License v2.0 or later;  you may not use this file
# except in compliance with one of these Licenses. You
# may obtain a copy of the Licenses at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
#
# Note: If this file is linked with Scapy, which is GPLv2+, your use of it
# must be under GPLv2+.  If at any point in the future it is no longer linked
# with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Traffic script for IPsec verification."""

import sys
import logging

from ipaddress import ip_address
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, TCP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.packet import Raw

from .TrafficScriptArg import TrafficScriptArg
from .PacketVerifier import RxQueue, TxQueue


def check_ipv4(pkt_recv, dscp):
    """Check received IPv4 IPsec packet.

    :param pkt_recv: Received packet to verify.
    :param dscp: DSCP value to check.
    :type pkt_recv: scapy.Ether
    :type dscp: int
    :raises RuntimeError: If received packet is invalid.
    """
    if not pkt_recv.haslayer(IP):
        raise RuntimeError(f"Not an IPv4 packet received: {pkt_recv!r}")

    rx_dscp = pkt_recv[IP].tos >> 2
    if rx_dscp != dscp:
        raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}")

    if not pkt_recv.haslayer(TCP):
        raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}")


def check_ipv6(pkt_recv, dscp):
    """Check received IPv6 IPsec packet.

    :param pkt_recv: Received packet to verify.
    :param dscp: DSCP value to check.
    :type pkt_recv: scapy.Ether
    :type dscp: int
    :raises RuntimeError: If received packet is invalid.
    """
    if not pkt_recv.haslayer(IPv6):
        raise RuntimeError(f"Not an IPv6 packet received: {pkt_recv!r}")

    rx_dscp = pkt_recv[IPv6].tc >> 2
    if rx_dscp != dscp:
        raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}")

    if not pkt_recv.haslayer(TCP):
        raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}")


# TODO: Pylint says too-many-locals and too-many-statements. Refactor!
def main():
    """Send and receive TCP packet."""
    args = TrafficScriptArg(
        [u"src_mac", u"dst_mac", u"src_ip", u"dst_ip", u"dscp"]
    )

    rxq = RxQueue(args.get_arg(u"rx_if"))
    txq = TxQueue(args.get_arg(u"tx_if"))

    src_mac = args.get_arg(u"src_mac")
    dst_mac = args.get_arg(u"dst_mac")
    src_ip = args.get_arg(u"src_ip")
    dst_ip = args.get_arg(u"dst_ip")
    dscp = int(args.get_arg(u"dscp"))

    ip_layer = IPv6 if ip_address(src_ip).version == 6 else IP

    sent_packets = list()
    pkt_send = (Ether(src=src_mac, dst=dst_mac) /
                ip_layer(src=src_ip, dst=dst_ip) /
                TCP())

    pkt_send /= Raw()
    sent_packets.append(pkt_send)
    txq.send(pkt_send)

    while True:
        pkt_recv = rxq.recv(2, sent_packets)
        if pkt_recv is None:
            raise RuntimeError(u"ICMPv6 echo reply Rx timeout")

        if pkt_recv.haslayer(ICMPv6ND_NS):
            # read another packet in the queue if the current one is ICMPv6ND_NS
            continue
        else:
            # otherwise process the current packet
            break

    if pkt_recv is None:
        raise RuntimeError(u"Rx timeout")

    if ip_layer == IP:
        check_ipv4(pkt_recv, dscp)
    else:
        check_ipv6(pkt_recv, dscp)

    sys.exit(0)


if __name__ == u"__main__":
    main()