1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
#
# run with 'nosetests' utility
import tempfile
from basetest import *
TEST_MAC_1 = "10:10:10:10:10:10"
# Test scapy structure
TEST_PKT = Ether(dst=TEST_MAC_1)/IP(src='127.0.0.1')/TCP(sport=443)
# Corresponding JSON-like structure
TEST_PKT_DEF = [
layer_def("Ether", dst=TEST_MAC_1),
layer_def("IP", dst="127.0.0.1"),
layer_def("TCP", sport="443")
]
def test_build_pkt():
pkt = build_pkt_get_scapy(TEST_PKT_DEF)
assert(pkt[TCP].sport == 443)
def test_build_invalid_structure_pkt():
ether_fields = {"dst": TEST_MAC_1, "type": "LOOP"}
pkt = build_pkt_get_scapy([
layer_def("Ether", **ether_fields),
layer_def("IP"),
layer_def("TCP", sport=8080)
])
assert(pkt[Ether].dst == TEST_MAC_1)
assert(isinstance(pkt[Ether].payload, Raw))
def test_reconstruct_pkt():
res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), None)
pkt = build_pkt_to_scapy(res)
assert(pkt[TCP].sport == 443)
def test_layer_del():
modif = [
{"id": "Ether"},
{"id": "IP"},
{"id": "TCP", "delete": True},
]
res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
pkt = build_pkt_to_scapy(res)
assert(not pkt[IP].payload)
def test_layer_field_edit():
modif = [
{"id": "Ether"},
{"id": "IP"},
{"id": "TCP", "fields": [{"id": "dport", "value": 777}]},
]
res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
pkt = build_pkt_to_scapy(res)
assert(pkt[TCP].dport == 777)
assert(pkt[TCP].sport == 443)
def test_layer_add():
modif = [
{"id": "Ether"},
{"id": "IP"},
{"id": "TCP"},
{"id": "Raw", "fields": [{"id": "load", "value": "GET /helloworld HTTP/1.0\n\n"}]},
]
res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
pkt = build_pkt_to_scapy(res)
assert("GET /helloworld" in str(pkt[TCP].payload.load))
def test_build_Raw():
pkt = build_pkt_get_scapy([
layer_def("Ether"),
layer_def("IP"),
layer_def("TCP"),
layer_def("Raw", load={"vtype": "BYTES", "base64": bytes_to_b64(b"hi")})
])
assert(str(pkt[Raw].load == "hi"))
def test_get_all():
service.get_all(v_handler)
def test_get_definitions_all():
get_definitions(None)
def_classnames = [pdef['id'] for pdef in get_definitions(None)['protocols']]
assert("IP" in def_classnames)
assert("Dot1Q" in def_classnames)
assert("TCP" in def_classnames)
def test_get_definitions_ether():
res = get_definitions(["Ether"])
assert(len(res) == 1)
assert(res['protocols'][0]['id'] == "Ether")
def test_get_payload_classes():
eth_payloads = get_payload_classes([{"id":"Ether"}])
assert("IP" in eth_payloads)
assert("Dot1Q" in eth_payloads)
assert("TCP" not in eth_payloads)
def test_pcap_read_and_write():
pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))]
pcap_b64 = service.write_pcap(v_handler, pkts_to_write)
array_pkt = service.read_pcap(v_handler, pcap_b64)
pkt = build_pkt_to_scapy(array_pkt[0])
assert(pkt[Ether].dst == TEST_MAC_1)
|