summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py')
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py96
1 files changed, 96 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
index 9cd473d7..d1207ca5 100644
--- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
@@ -78,6 +78,35 @@ def test_build_Raw():
])
assert(str(pkt[Raw].load == "hi"))
+def test_build_fixed_pkt_size_bytes_gen():
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP"),
+ layer_def("Raw", load={
+ "vtype": "BYTES",
+ "generate": "template",
+ "total_size": 64,
+ "template_base64": bytes_to_b64(b"hi")
+ })
+ ])
+ print(len(pkt))
+ assert(len(pkt) == 64)
+
+def test_build_fixed_pkt_size_bytes_gen():
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP"),
+ layer_def("Raw", load={
+ "vtype": "BYTES",
+ "generate": "random_ascii",
+ "total_size": 256
+ })
+ ])
+ print(len(pkt))
+ assert(len(pkt) == 256)
+
def test_get_all():
service.get_all(v_handler)
@@ -98,6 +127,16 @@ def test_get_payload_classes():
assert("IP" in eth_payloads)
assert("Dot1Q" in eth_payloads)
assert("TCP" not in eth_payloads)
+ assert(eth_payloads[0] == "IP") # order(based on prococols.json)
+
+def test_get_tcp_payload_classes():
+ payloads = get_payload_classes([{"id":"TCP"}])
+ assert("Raw" in payloads)
+
+def test_get_dot1q_payload_classes():
+ payloads = get_payload_classes([{"id":"Dot1Q"}])
+ assert("Dot1Q" in payloads)
+ assert("IP" in payloads)
def test_pcap_read_and_write():
pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))]
@@ -120,6 +159,28 @@ def test_layer_random_value():
ether_fields = fields_to_map(res['data'][0]['fields'])
assert(re.match(RE_MAC, ether_fields['src']['value']))
+def test_IP_options():
+ options_expr = "[IPOption_SSRR(copy_flag=0, routers=['1.2.3.4', '5.6.7.8'])]"
+ res = build_pkt([
+ layer_def("Ether"),
+ layer_def("IP", options={"vtype": "EXPRESSION", "expr": options_expr}),
+ ])
+ pkt = build_pkt_to_scapy(res)
+ options = pkt[IP].options
+ assert(options[0].__class__.__name__ == 'IPOption_SSRR')
+ assert(options[0].copy_flag == 0)
+ assert(options[0].routers == ['1.2.3.4', '5.6.7.8'])
+
+def test_TCP_options():
+ options_expr = "[('MSS', 1460), ('NOP', None), ('NOP', None), ('SAckOK', b'')]"
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP", options={"vtype": "EXPRESSION", "expr": options_expr}),
+ ])
+ options = pkt[TCP].options
+ assert(options[0] == ('MSS', 1460) )
+
def test_layer_wrong_structure():
payload = [
layer_def("Ether"),
@@ -153,3 +214,38 @@ def test_layer_wrong_structure():
assert(real_structure == ["Ether", "IP", "Raw", None, None])
assert(valid_structure_flags == [True, True, True, False, False])
+def test_ether_definitions():
+ etherDef = get_definition_of("Ether")
+ assert(etherDef['name'] == "Ethernet II")
+ etherFields = etherDef['fields']
+ assert(etherFields[0]['id'] == 'dst')
+ assert(etherFields[0]['name'] == 'Destination')
+ assert(etherFields[1]['id'] == 'src')
+ assert(etherFields[1]['name'] == 'Source')
+ assert(etherFields[2]['id'] == 'type')
+ assert(etherFields[2]['name'] == 'Type')
+
+def test_ether_definitions():
+ pdef = get_definition_of("ICMP")
+ assert(pdef['id'] == "ICMP")
+ assert(pdef['name'])
+ assert(pdef['fields'])
+
+def test_ip_definitions():
+ pdef = get_definition_of("IP")
+ fields = pdef['fields']
+ assert(fields[0]['id'] == 'version')
+
+ assert(fields[1]['id'] == 'ihl')
+ assert(fields[1]['auto'] == True)
+
+ assert(fields[3]['id'] == 'len')
+ assert(fields[3]['auto'] == True)
+
+ assert(fields[5]['id'] == 'flags')
+ assert(fields[5]['type'] == 'BITMASK')
+ assert(fields[5]['bits'][0]['name'] == 'Reserved')
+
+ assert(fields[9]['id'] == 'chksum')
+ assert(fields[9]['auto'] == True)
+