summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
blob: 0e95fa321707259023fa46f46490eb48199b79c8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import sys
import json
import base64
import inspect
from inspect import getcallargs
# add paths to scapy_service and trex_stl_lib.api
sys.path.append(os.path.abspath(os.pardir))
sys.path.append(os.path.abspath(os.path.join(os.pardir, os.pardir, os.pardir)))

from scapy_service import *
from scapy.all import *

service = Scapy_service()
v_handler = service.get_version_handler('1','01')

def pretty_json(obj):
    return json.dumps(obj, indent=4)

def pprint(obj):
    print(pretty_json(obj))

def is_verbose():
    return True

def pass_result(result, *args):
    # returns result unchanged, but can display debug info if enabled
    if is_verbose():
        fargs = (inspect.stack()[-1][4])
        print(fargs[0])
        pprint(result)
    return result

def pass_pkt(result):
    # returns packet unchanged, but can display debug info if enabled
    if is_verbose() and result is not None:
        result.show2()
    return result

# utility functions for tests

def layer_def(layerId, **layerfields):
    # test helper method to generate JSON-like protocol definition object for scapy
    # ex. { "id": "Ether", "fields": [ { "id": "dst", "value": "10:10:10:10:10:10" } ] }
    res = { "id": layerId }
    if layerfields:
        res["fields"] = [ {"id": k, "value": v} for k,v in layerfields.items() ]
    return res

def get_version_handler():
    return pass_result(service.get_version_handler("1", "01"))

def build_pkt(model_def):
    return pass_result(service.build_pkt(v_handler, model_def))

def build_pkt_ex(model_def, instructions_def):
    return pass_result(service.build_pkt_ex(v_handler, model_def, instructions_def))

def build_pkt_get_scapy(model_def):
    return build_pkt_to_scapy(build_pkt(model_def))

def reconstruct_pkt(bytes_b64, model_def):
    return pass_result(service.reconstruct_pkt(v_handler, bytes_b64, model_def))

def get_definitions(def_filter):
    return pass_result(service.get_definitions(v_handler, def_filter))

def get_definition_of(scapy_classname):
    return pass_result(service.get_definitions(v_handler, [scapy_classname]))['protocols'][0]

def get_payload_classes(def_filter):
    return pass_result(service.get_payload_classes(v_handler, def_filter))

def build_pkt_to_scapy(buildpkt_result):
    return pass_pkt(Ether(b64_to_bytes(buildpkt_result['binary'])))

def fields_to_map(field_array):
    # [{id, value, hvalue, offset}, ...] to map id -> {value, hvalue, offset}
    res = {}
    if field_array:
        for f in field_array:
            res[ f["id"] ] = f
    return res

def adapt_json_protocol_fields(protocols_array):
    # replaces layer.fields(array) with map for easier access in tests
    for protocol in protocols_array:
        # change structure for easier
        if protocol.get("fields"):
            protocol["fields"] = fields_to_map(protocol["fields"])

def get_templates():
    return pass_result(service.get_templates(v_handler))

def get_template_by_id(templateId):
    params = {"id": templateId}
    template_b64 = service.get_template(v_handler, params)
    return pass_result(base64.b64decode(template_b64))