diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py')
-rw-r--r-- | scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py index 9cd473d7..d1207ca5 100644 --- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py @@ -78,6 +78,35 @@ def test_build_Raw(): ]) assert(str(pkt[Raw].load == "hi")) +def test_build_fixed_pkt_size_bytes_gen(): + pkt = build_pkt_get_scapy([ + layer_def("Ether"), + layer_def("IP"), + layer_def("TCP"), + layer_def("Raw", load={ + "vtype": "BYTES", + "generate": "template", + "total_size": 64, + "template_base64": bytes_to_b64(b"hi") + }) + ]) + print(len(pkt)) + assert(len(pkt) == 64) + +def test_build_fixed_pkt_size_bytes_gen(): + pkt = build_pkt_get_scapy([ + layer_def("Ether"), + layer_def("IP"), + layer_def("TCP"), + layer_def("Raw", load={ + "vtype": "BYTES", + "generate": "random_ascii", + "total_size": 256 + }) + ]) + print(len(pkt)) + assert(len(pkt) == 256) + def test_get_all(): service.get_all(v_handler) @@ -98,6 +127,16 @@ def test_get_payload_classes(): assert("IP" in eth_payloads) assert("Dot1Q" in eth_payloads) assert("TCP" not in eth_payloads) + assert(eth_payloads[0] == "IP") # order(based on prococols.json) + +def test_get_tcp_payload_classes(): + payloads = get_payload_classes([{"id":"TCP"}]) + assert("Raw" in payloads) + +def test_get_dot1q_payload_classes(): + payloads = get_payload_classes([{"id":"Dot1Q"}]) + assert("Dot1Q" in payloads) + assert("IP" in payloads) def test_pcap_read_and_write(): pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))] @@ -120,6 +159,28 @@ def test_layer_random_value(): ether_fields = fields_to_map(res['data'][0]['fields']) assert(re.match(RE_MAC, ether_fields['src']['value'])) +def test_IP_options(): + options_expr = "[IPOption_SSRR(copy_flag=0, routers=['1.2.3.4', '5.6.7.8'])]" + res = build_pkt([ + layer_def("Ether"), + layer_def("IP", options={"vtype": "EXPRESSION", "expr": options_expr}), + ]) + pkt = build_pkt_to_scapy(res) + options = pkt[IP].options + assert(options[0].__class__.__name__ == 'IPOption_SSRR') + assert(options[0].copy_flag == 0) + assert(options[0].routers == ['1.2.3.4', '5.6.7.8']) + +def test_TCP_options(): + options_expr = "[('MSS', 1460), ('NOP', None), ('NOP', None), ('SAckOK', b'')]" + pkt = build_pkt_get_scapy([ + layer_def("Ether"), + layer_def("IP"), + layer_def("TCP", options={"vtype": "EXPRESSION", "expr": options_expr}), + ]) + options = pkt[TCP].options + assert(options[0] == ('MSS', 1460) ) + def test_layer_wrong_structure(): payload = [ layer_def("Ether"), @@ -153,3 +214,38 @@ def test_layer_wrong_structure(): assert(real_structure == ["Ether", "IP", "Raw", None, None]) assert(valid_structure_flags == [True, True, True, False, False]) +def test_ether_definitions(): + etherDef = get_definition_of("Ether") + assert(etherDef['name'] == "Ethernet II") + etherFields = etherDef['fields'] + assert(etherFields[0]['id'] == 'dst') + assert(etherFields[0]['name'] == 'Destination') + assert(etherFields[1]['id'] == 'src') + assert(etherFields[1]['name'] == 'Source') + assert(etherFields[2]['id'] == 'type') + assert(etherFields[2]['name'] == 'Type') + +def test_ether_definitions(): + pdef = get_definition_of("ICMP") + assert(pdef['id'] == "ICMP") + assert(pdef['name']) + assert(pdef['fields']) + +def test_ip_definitions(): + pdef = get_definition_of("IP") + fields = pdef['fields'] + assert(fields[0]['id'] == 'version') + + assert(fields[1]['id'] == 'ihl') + assert(fields[1]['auto'] == True) + + assert(fields[3]['id'] == 'len') + assert(fields[3]['auto'] == True) + + assert(fields[5]['id'] == 'flags') + assert(fields[5]['type'] == 'BITMASK') + assert(fields[5]['bits'][0]['name'] == 'Reserved') + + assert(fields[9]['id'] == 'chksum') + assert(fields[9]['auto'] == True) + |