summaryrefslogtreecommitdiffstats
path: root/scripts/automation/regression/functional_tests/scapy_server_test.py
blob: 1c6d2bd0a31a477a6d1b7d2311d4e07b36e7bddf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# scapy server unit test

import sys,os
scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'examples'))
print scapy_server_path
stl_pathname = os.path.abspath(os.path.join(os.pardir, os.pardir, 'trex_control_plane','stl'))
sys.path.append(scapy_server_path)
sys.path.append(stl_pathname)



#import stl_path
import trex_stl_lib
from trex_stl_lib.api import *
from copy import deepcopy

import tempfile
import md5

import outer_packages
from platform_cmd_link import *
import functional_general_test
from nose.tools import assert_equal
from nose.tools import assert_not_equal
from nose.tools import nottest
from nose.plugins.attrib import attr

from scapy_server import *


class scapy_server_tester(functional_general_test.CGeneralFunctional_Test):
    def setUp(self):
        pass

    def tearDown(self):
        pass

    ''' 
    test for db and field update - checking check_update_test()
    '''
    def test_check_update(self):
        allData = get_all()
        allDataParsed = json.loads(allData)
        dbMD5 = allDataParsed['DB_md5']
        fieldMD5 = allDataParsed['fields_md5']
        result = check_update(dbMD5,fieldMD5)
        result = json.loads(result)
        '''
        if result[0][0] == 'Success' and result[1][0] == 'Success':
            print 'check_update_test [md5 comparison test]: Success'
        else:
            print 'check_update_test [md5 comparison test]: md5s of fields or db do not match the source'
        '''
        resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success')
        assert_equal(resT1,True)

        result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5'))
        result = json.loads(result)
        '''
        if result[0][0] == 'Fail' and result[1][0] == 'Fail':
            print 'check_update_test [wrong md5s return failure]: Success'
        else:
            print 'check_update_test [wrong md5s return failure]: md5s of fields or db return Success for invalid value'
        '''
        resT2 = (result[0][0] == 'Fail' and result[1][0] == 'Fail')
        assert_equal(resT2,True)

        result = check_update(dbMD5,json.dumps('falseMD5'))
        result = json.loads(result)
        '''
        if result[0][0] == 'Fail' and result[1][0] == 'Success':
            print 'check_update_test [wrong field md5 returns error, correct db md5]: Success'
        else:
            print 'md5 of field return Success for invalid value'
        '''
        resT3 = (result[0][0] == 'Fail' and result[1][0] == 'Success')
        assert_equal(resT3,True)

        result = check_update(json.dumps('falseMD5'),fieldMD5)
        result = json.loads(result)
        '''
        if result[0][0] == 'Success' and result[1][0] == 'Fail':
            print 'check_update_test [wrong db md5 returns error, correct field md5]: Success'
        else:
            print 'md5 of db return Success for invalid value'
        '''
        resT4 = (result[0][0] == 'Success' and result[1][0] == 'Fail')
        assert_equal(resT4,True)


    def test_check_updating_db(self):
        #assume i got old db
        result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5'))
        result = json.loads(result)
        if result[0][0] == 'Fail' or result[1][0] == 'Fail':
            newAllData = get_all()
            allDataParsed = json.loads(newAllData)
            dbMD5 = allDataParsed['DB_md5']
            fieldMD5 = allDataParsed['fields_md5']
            result = check_update(dbMD5,fieldMD5)
            result = json.loads(result)
            '''
            if result[0][0] == 'Success' and result[1][0] == 'Success':
                print 'check_updating_db [got old db and updated it]: Success'
            else:
                print'check_updating_db [got old db and updated it]: FAILED'
            '''
            resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success')
            assert_equal(resT1,True)
        else:
            raise Exception("scapy_server_test: check_updating_db failed")


# testing pkt = Ether()/IP()/TCP()/"test" by defualt
    def test_build_packet(self,original_pkt = json.dumps('Ether()/IP()/TCP()/"test"')):
        test_pkt = original_pkt
        original_pkt = eval(json.loads(original_pkt))
        test_res = build_pkt(test_pkt)
        test_res = json.loads(test_res)
        test_pkt_buffer = json.loads(test_res[2])
        test_pkt_buffer = test_pkt_buffer.decode('base64')
        '''
        if test_pkt_buffer == str(original_pkt):
            print 'build_pkt test [scapy packet and string-defined packet comparison]: Success'
        else:
            print 'build_pkt test [scapy packet and string-defined packet comparison]: FAILED'
        '''
        resT1 = (test_pkt_buffer == str(original_pkt))
        assert_equal(resT1,True)


#testing offsets of packet IP() by default
    def test_get_all_offsets(self,original_pkt = json.dumps('IP()')):
        test_pkt = original_pkt
        original_pkt = eval(json.loads(original_pkt))
        tested_offsets_by_layers = get_all_pkt_offsets(test_pkt)
        tested_offsets_by_layers = json.loads(tested_offsets_by_layers)
        layers = json.loads(test_pkt).split('/')
        offsets_by_layers = {}
        for layer in layers:
            fields_list = []
            for f in original_pkt.fields_desc:
                size = f.get_size_bytes()
                if f.name is 'load':
                    size = len(original_pkt)
                fields_list.append([f.name, f.offset, size])
            original_pkt = original_pkt.payload
            offsets_by_layers[layer] = fields_list
        '''
        if tested_offsets_by_layers == offsets_by_layers:
            print 'Success'
        else:
            print 'test_get_all_offsets[comparison of offsets in given packet]: FAILED'
        '''
        resT1 = (tested_offsets_by_layers == offsets_by_layers)
        assert_equal(resT1,True)

    def test_multi_packet(self):
        e0 = json.dumps('Ether()')
        e1 = json.dumps('Ether()/IP()')
        e2 = json.dumps('TCP()')
        e3 = json.dumps('UDP()')
        e4 = json.dumps('Ether()/IP()/TCP()/"test"')
        e5 = json.dumps('Ether()/IP()/UDP()')
        packets = [e0,e1,e2,e3,e4,e5]

        for packet in packets:
            self.test_get_all_offsets(packet)

        for packet in packets:
            self.test_build_packet(packet)