summaryrefslogtreecommitdiffstats
path: root/scripts/automation/regression/functional_tests/scapy_server_test.py
blob: 55daae77544ed9a35265a3e982816ff0471aac6e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# scapy server unit test

import sys,os
scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'services','scapy_server'))
stl_pathname = os.path.abspath(os.path.join(os.pardir, os.pardir, 'trex_control_plane','stl'))
sys.path.append(scapy_server_path)
sys.path.append(stl_pathname)


#import stl_path
import trex_stl_lib
from trex_stl_lib.api import *
from copy import deepcopy

import tempfile
import hashlib
from platform_cmd_link import *
import functional_general_test
from nose.tools import assert_equal
from nose.tools import assert_not_equal
from nose.tools import nottest
from nose.plugins.attrib import attr
import binascii
from scapy_server import *


class scapy_server_tester(functional_general_test.CGeneralFunctional_Test):
    def setUp(self):
        self.s = scapy_server()

    def tearDown(self):
        pass

    ''' 
    test for db and field update - checking check_update_test()
    '''
    def test_check_update(self):
        allData = self.s.get_all()
        allDataParsed = allData
        dbMD5 = allDataParsed['db_md5']
        fieldMD5 = allDataParsed['fields_md5']
        resT1 = self.s.check_update(dbMD5,fieldMD5)
        assert_equal(resT1,True)
        try:
            resT2 = False
            resT2 = self.s.check_update('falseMD5','falseMD5')
        except Exception as e:
            if e.message == "Fields DB is not up to date":
                resT2 = True
            else:
                raise
        assert_equal(resT2,True)
        try:
            resT3 = False
            resT3 = self.s.check_update(dbMD5,'falseMD5')
        except Exception as e:
            if e.message == "Fields DB is not up to date":
                resT3 = True
            else:
                raise
        assert_equal(resT3,True)
        try:
            resT4 = False
            resT4 = self.s.check_update('falseMD5',fieldMD5)
        except Exception as e:
            if e.message == "Protocol DB is not up to date":
                resT4 = True
            else:
                raise
        assert_equal(resT4,True)


    def test_check_updating_db(self):
        #assume i got old db
        try:
            result = self.s.check_update('falseMD5','falseMD5')
        except:
            newAllData = self.s.get_all()
            dbMD5 = newAllData['db_md5']
            fieldMD5 = newAllData['fields_md5']
            result = self.s.check_update(dbMD5,fieldMD5)
            assert_equal(result,True)
        else:
            raise Exception("scapy_server_test: check_updating_db failed")


# testing pkt = Ether()/IP()/TCP()/"test" by defualt
    def test_build_packet(self,original_pkt='Ether()/IP()/TCP()/"test"'):
        test_pkt = original_pkt
        original_pkt = eval(original_pkt)
        test_res = self.s.build_pkt(test_pkt)
        test_pkt_buffer = test_res[1]
        resT1 = (test_pkt_buffer == binascii.b2a_base64(str(original_pkt)))
        assert_equal(resT1,True)


#testing offsets of packet IP() by default
    def test_get_all_offsets(self,original_pkt = 'IP()'):
        test_pkt = original_pkt
        original_pkt = eval(original_pkt)
        tested_offsets_by_layers = self.s.get_all_pkt_offsets(test_pkt)
        layers = (test_pkt).split('/')
        offsets_by_layers = {}
        for layer in layers:
            fields_list = []
            for f in original_pkt.fields_desc:
                size = f.get_size_bytes()
                if f.name is 'load':
                    size = len(original_pkt)
                fields_list.append([f.name, f.offset, size])
            original_pkt = original_pkt.payload
            layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric
            layer_name = re.sub(r'\W+', '',layer_name)
            offsets_by_layers[layer_name] = fields_list
        resT1 = (tested_offsets_by_layers == offsets_by_layers)
        assert_equal(resT1,True)

    def test_multi_packet(self):
        e0 = 'Ether()'
        e1 = 'Ether()/IP()'
        e2 = 'TCP()'
        e3 = 'UDP()'
        e4 = 'Ether()/IP()/TCP()/"test"'
        e5 = 'Ether()/IP()/UDP()'
        packets = [e0,e1,e2,e3,e4,e5]
        for packet in packets:
            self.test_get_all_offsets(packet)

        for packet in packets:
            self.test_build_packet(packet)