diff options
author | Anton Kiselev <anton.kisel@gmail.com> | 2016-10-18 19:46:06 +0700 |
---|---|---|
committer | Anton Kiselev <anton.kisel@gmail.com> | 2016-10-18 19:46:06 +0700 |
commit | 4b49ef2f81941eb403fa7b39024365493a9afb63 (patch) | |
tree | 518c78028891d6bab063e3b412e7a9c8b684b58b /scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py | |
parent | b1264cf858e129ab4e131122a0ec9e2d75ac179b (diff) |
add unit tests for scapy service
Signed-off-by: Anton Kiselev <anton.kisel@gmail.com>
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py')
-rw-r--r-- | scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py new file mode 100644 index 00000000..5ea8cf07 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py @@ -0,0 +1,105 @@ +# +# run with 'nosetests' utility + +import tempfile +from basetest import * + +TEST_MAC_1 = "10:10:10:10:10:10" +# Test scapy structure +TEST_PKT = Ether(dst=TEST_MAC_1)/IP(src='127.0.0.1')/TCP(sport=443) + +# Corresponding JSON-like structure +TEST_PKT_DEF = [ + layer_def("Ether", dst=TEST_MAC_1), + layer_def("IP", dst="127.0.0.1"), + layer_def("TCP", sport="443") + ] + +def test_build_pkt(): + pkt = build_pkt_get_scapy(TEST_PKT_DEF) + assert(pkt[TCP].sport == 443) + +def test_build_invalid_structure_pkt(): + ether_fields = {"dst": TEST_MAC_1, "type": "LOOP"} + pkt = build_pkt_get_scapy([ + layer_def("Ether", **ether_fields), + layer_def("IP"), + layer_def("TCP", sport=8080) + ]) + assert(pkt[Ether].dst == TEST_MAC_1) + assert(isinstance(pkt[Ether].payload, Raw)) + +def test_reconstruct_pkt(): + res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), None) + pkt = build_pkt_to_scapy(res) + assert(pkt[TCP].sport == 443) + +def test_layer_del(): + modif = [ + {"id": "Ether"}, + {"id": "IP"}, + {"id": "TCP", "delete": True}, + ] + res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif) + pkt = build_pkt_to_scapy(res) + assert(not pkt[IP].payload) + +def test_layer_field_edit(): + modif = [ + {"id": "Ether"}, + {"id": "IP"}, + {"id": "TCP", "fields": [{"id": "dport", "value": 777}]}, + ] + res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif) + pkt = build_pkt_to_scapy(res) + assert(pkt[TCP].dport == 777) + assert(pkt[TCP].sport == 443) + +def test_layer_add(): + modif = [ + {"id": "Ether"}, + {"id": "IP"}, + {"id": "TCP"}, + {"id": "Raw", "fields": [{"id": "load", "value": "GET /helloworld HTTP/1.0\n\n"}]}, + ] + res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif) + pkt = build_pkt_to_scapy(res) + assert("GET /helloworld" in str(pkt[TCP].payload.load)) + +def test_build_Raw(): + pkt = build_pkt_get_scapy([ + layer_def("Ether"), + layer_def("IP"), + layer_def("TCP"), + layer_def("Raw", load={"vtype": "BYTES", "base64": bytes_to_b64(b"hi")}) + ]) + assert(str(pkt[Raw].load == "hi")) + +def test_get_all(): + service.get_all(v_handler) + +def test_get_definitions_all(): + get_definitions(None) + def_classnames = [pdef['id'] for pdef in get_definitions(None)['protocols']] + assert("IP" in def_classnames) + assert("Dot1Q" in def_classnames) + assert("TCP" in def_classnames) + +def test_get_definitions_ether(): + res = get_definitions(["Ether"]) + assert(len(res) == 1) + assert(res['protocols'][0]['id'] == "Ether") + +def test_get_payload_classes(): + eth_payloads = get_payload_classes([{"id":"Ether"}]) + assert("IP" in eth_payloads) + assert("Dot1Q" in eth_payloads) + assert("TCP" not in eth_payloads) + +def test_pcap_read_and_write(): + pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))] + pcap_b64 = service.write_pcap(v_handler, pkts_to_write) + array_pkt = service.read_pcap(v_handler, pcap_b64) + pkt = build_pkt_to_scapy(array_pkt[0]) + assert(pkt[Ether].dst == TEST_MAC_1) + |