summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests')
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py3
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py96
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py69
3 files changed, 168 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
index 17dd304a..1db2c62b 100644
--- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
@@ -62,6 +62,9 @@ def reconstruct_pkt(bytes_b64, model_def):
def get_definitions(def_filter):
return pass_result(service.get_definitions(v_handler, def_filter))
+def get_definition_of(scapy_classname):
+ return pass_result(service.get_definitions(v_handler, [scapy_classname]))['protocols'][0]
+
def get_payload_classes(def_filter):
return pass_result(service.get_payload_classes(v_handler, def_filter))
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
index 9cd473d7..d1207ca5 100644
--- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
@@ -78,6 +78,35 @@ def test_build_Raw():
])
assert(str(pkt[Raw].load == "hi"))
+def test_build_fixed_pkt_size_bytes_gen():
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP"),
+ layer_def("Raw", load={
+ "vtype": "BYTES",
+ "generate": "template",
+ "total_size": 64,
+ "template_base64": bytes_to_b64(b"hi")
+ })
+ ])
+ print(len(pkt))
+ assert(len(pkt) == 64)
+
+def test_build_fixed_pkt_size_bytes_gen():
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP"),
+ layer_def("Raw", load={
+ "vtype": "BYTES",
+ "generate": "random_ascii",
+ "total_size": 256
+ })
+ ])
+ print(len(pkt))
+ assert(len(pkt) == 256)
+
def test_get_all():
service.get_all(v_handler)
@@ -98,6 +127,16 @@ def test_get_payload_classes():
assert("IP" in eth_payloads)
assert("Dot1Q" in eth_payloads)
assert("TCP" not in eth_payloads)
+ assert(eth_payloads[0] == "IP") # order(based on prococols.json)
+
+def test_get_tcp_payload_classes():
+ payloads = get_payload_classes([{"id":"TCP"}])
+ assert("Raw" in payloads)
+
+def test_get_dot1q_payload_classes():
+ payloads = get_payload_classes([{"id":"Dot1Q"}])
+ assert("Dot1Q" in payloads)
+ assert("IP" in payloads)
def test_pcap_read_and_write():
pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))]
@@ -120,6 +159,28 @@ def test_layer_random_value():
ether_fields = fields_to_map(res['data'][0]['fields'])
assert(re.match(RE_MAC, ether_fields['src']['value']))
+def test_IP_options():
+ options_expr = "[IPOption_SSRR(copy_flag=0, routers=['1.2.3.4', '5.6.7.8'])]"
+ res = build_pkt([
+ layer_def("Ether"),
+ layer_def("IP", options={"vtype": "EXPRESSION", "expr": options_expr}),
+ ])
+ pkt = build_pkt_to_scapy(res)
+ options = pkt[IP].options
+ assert(options[0].__class__.__name__ == 'IPOption_SSRR')
+ assert(options[0].copy_flag == 0)
+ assert(options[0].routers == ['1.2.3.4', '5.6.7.8'])
+
+def test_TCP_options():
+ options_expr = "[('MSS', 1460), ('NOP', None), ('NOP', None), ('SAckOK', b'')]"
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP", options={"vtype": "EXPRESSION", "expr": options_expr}),
+ ])
+ options = pkt[TCP].options
+ assert(options[0] == ('MSS', 1460) )
+
def test_layer_wrong_structure():
payload = [
layer_def("Ether"),
@@ -153,3 +214,38 @@ def test_layer_wrong_structure():
assert(real_structure == ["Ether", "IP", "Raw", None, None])
assert(valid_structure_flags == [True, True, True, False, False])
+def test_ether_definitions():
+ etherDef = get_definition_of("Ether")
+ assert(etherDef['name'] == "Ethernet II")
+ etherFields = etherDef['fields']
+ assert(etherFields[0]['id'] == 'dst')
+ assert(etherFields[0]['name'] == 'Destination')
+ assert(etherFields[1]['id'] == 'src')
+ assert(etherFields[1]['name'] == 'Source')
+ assert(etherFields[2]['id'] == 'type')
+ assert(etherFields[2]['name'] == 'Type')
+
+def test_ether_definitions():
+ pdef = get_definition_of("ICMP")
+ assert(pdef['id'] == "ICMP")
+ assert(pdef['name'])
+ assert(pdef['fields'])
+
+def test_ip_definitions():
+ pdef = get_definition_of("IP")
+ fields = pdef['fields']
+ assert(fields[0]['id'] == 'version')
+
+ assert(fields[1]['id'] == 'ihl')
+ assert(fields[1]['auto'] == True)
+
+ assert(fields[3]['id'] == 'len')
+ assert(fields[3]['auto'] == True)
+
+ assert(fields[5]['id'] == 'flags')
+ assert(fields[5]['type'] == 'BITMASK')
+ assert(fields[5]['bits'][0]['name'] == 'Reserved')
+
+ assert(fields[9]['id'] == 'chksum')
+ assert(fields[9]['auto'] == True)
+
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py
new file mode 100644
index 00000000..ceb88b47
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py
@@ -0,0 +1,69 @@
+# run with 'nosetests' utility
+
+from basetest import *
+from scapy_service import *
+
+def test_generate_random_bytes():
+ res = generate_random_bytes(10, 333, ord('0'), ord('9'))
+ print(res)
+ assert(len(res) == 10)
+ assert(res == b'5390532937') # random value with this seed
+
+def test_generate_bytes_from_template_empty():
+ res = generate_bytes_from_template(5, b"")
+ print(res)
+ assert(res == b"")
+
+def test_generate_bytes_from_template_neg():
+ res = generate_bytes_from_template(-5, b"qwe")
+ assert(res == b"")
+
+def test_generate_bytes_from_template_less():
+ res = generate_bytes_from_template(5, b"qwe")
+ print(res)
+ assert(res == b"qweqw")
+
+def test_generate_bytes_from_template_same():
+ res = generate_bytes_from_template(5, b"qwert")
+ print(res)
+ assert(res == b"qwert")
+
+def test_generate_bytes_from_template_more():
+ res = generate_bytes_from_template(5, b"qwerty")
+ print(res)
+ assert(res == b"qwert")
+
+def test_parse_template_code_with_trash():
+ res = parse_template_code("0xDE AD\n be ef \t0xDEAD")
+ print(res)
+ assert(res == bytearray.fromhex('DEADBEEFDEAD'))
+
+def test_generate_bytes():
+ res = generate_bytes({"generate":"random_bytes", "seed": 123, "size": 12})
+ print(res)
+ assert(len(res) == 12)
+
+def test_generate_ascii_default_seed():
+ res = generate_bytes({"generate":"random_ascii", "size": 14})
+ print(res)
+ assert(len(res) == 14)
+
+
+def test_generate_template_code_no_size():
+ res = generate_bytes({"generate":"template_code", "template_code": "BE EF"})
+ assert(res == bytearray.fromhex('BE EF'))
+
+def test_generate_template_code_less():
+ res = generate_bytes({"generate":"template_code", "template_code": "DE AD BE EF", "size": 2})
+ assert(res == bytearray.fromhex('DE AD'))
+
+def test_generate_template_code_more():
+ res = generate_bytes({"generate":"template_code", "template_code": "0xDEAD 0xBEEF", "size": 6})
+ assert(res == bytearray.fromhex('DE AD BE EF DE AD'))
+
+def test_generate_template_base64():
+ res = generate_bytes({"generate":"template", "template_base64": bytes_to_b64(b'hi'), "size": 5})
+ print(res)
+ assert(res == b'hihih')
+
+