summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
blob: 79773f98eacbcb7ed30212de564e1f81ff5a1631 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import sys
import json
import base64
import inspect
from inspect import getcallargs
# add paths to scapy_service and trex_stl_lib.api
sys.path.append(os.path.abspath(os.pardir))
sys.path.append(os.path.abspath(os.path.join(os.pardir, os.pardir, os.pardir)))

from scapy_service import *
from scapy.all import *

service = Scapy_service()
v_handler = service.get_version_handler('1','01')

def pretty_json(obj):
    return json.dumps(obj, indent=4)

def pprint(obj):
    print(pretty_json(obj))

def is_verbose():
    return True

def pass_result(result, *args):
    # returns result unchanged, but can display debug info if enabled
    if is_verbose():
        fargs = (inspect.stack()[-1][4])
        print(fargs[0])
        pprint(result)
    return result

def pass_pkt(result):
    # returns packet unchanged, but can display debug info if enabled
    if is_verbose() and result is not None:
        result.show2()
    return result

# utility functions for tests

def layer_def(layerId, **layerfields):
    # test helper method to generate JSON-like protocol definition object for scapy
    # ex. { "id": "Ether", "fields": [ { "id": "dst", "value": "10:10:10:10:10:10" } ] }
    res = { "id": layerId }
    if layerfields:
        res["fields"] = [ {"id": k, "value": v} for k,v in layerfields.items() ]
    return res

def get_version_handler():
    return pass_result(service.get_version_handler("1", "01"))

def build_pkt(model_def):
    return pass_result(service.build_pkt(v_handler, model_def))

def build_pkt_get_scapy(model_def):
    return build_pkt_to_scapy(build_pkt(model_def))

def reconstruct_pkt(bytes_b64, model_def):
    return pass_result(service.reconstruct_pkt(v_handler, bytes_b64, model_def))

def get_definitions(def_filter):
    return pass_result(service.get_definitions(v_handler, def_filter))

def get_payload_classes(def_filter):
    return pass_result(service.get_payload_classes(v_handler, def_filter))

def build_pkt_to_scapy(buildpkt_result):
    return pass_pkt(Ether(b64_to_bytes(buildpkt_result['binary'])))