diff options
Diffstat (limited to 'vpp-api/python/tests/test_cli.py')
0 files changed, 0 insertions, 0 deletions
![]() |
index : vpp | |
Vector Packet Processing | Grokmirror user |
aboutsummaryrefslogtreecommitdiffstats |
#!/router/bin/python
from .stl_general_test import CStlGeneral_Test, CTRexScenario
from trex_stl_lib.api import *
import os, sys
import pprint
def ip2num (ip_str):
return struct.unpack('>L', socket.inet_pton(socket.AF_INET, ip_str))[0]
def num2ip (ip_num):
return socket.inet_ntoa(struct.pack('>L', ip_num))
def ip_add (ip_str, cnt):
return num2ip(ip2num(ip_str) + cnt)
class STLCapture_Test(CStlGeneral_Test):
"""Tests for capture packets"""
def setUp(self):
CStlGeneral_Test.setUp(self)
if not self.is_loopback:
self.skip('capture tests are skipped on a non-loopback machine')
assert 'bi' in CTRexScenario.stl_ports_map
self.c = CTRexScenario.stl_trex
self.tx_port, self.rx_port = CTRexScenario.stl_ports_map['bi'][0]
self.c.connect()
self.c.reset(ports = [self.tx_port, self.rx_port])
self.pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example')
self.percentage = 5 if self.is_virt_nics else 50
# some setups (enic) might add VLAN always
self.nic_adds_vlan = CTRexScenario.setup_name in ['trex11']
@classmethod
def tearDownClass(cls):
if CTRexScenario.stl_init_error:
return
# connect back at end of tests
if not cls.is_connected():
CTRexScenario.stl_trex.connect()
def __compare_captures (self, tx_pkt_list, rx_pkt_list):
# make sure we have the same binaries in both lists
tx_pkt_list_bin = {pkt['binary'] for pkt in tx_pkt_list}
rx_pkt_list_bin = {pkt['binary'] for pkt in rx_pkt_list}
if tx_pkt_list_bin != rx_pkt_list_bin:
# if the NIC does not add VLAN - a simple binary compare will do
if not self.nic_adds_vlan:
assert 0, "TX and RX captures do not match"
# the NIC adds VLAN - compare IP level
tx_pkt_list_ip = { bytes((Ether(pkt))['IP']) for pkt in tx_pkt_list_bin}
rx_pkt_list_ip = { bytes((Ether(pkt))['IP']) for pkt in rx_pkt_list_bin}
if tx_pkt_list_ip != rx_pkt_list_ip:
assert 0, "TX and RX captures do not match"
# a simple capture test - inject packets and see the packets arrived the same
def test_basic_capture (self):
pkt_count = 100
try:
# move to service mode
self.c.set_service_mode(ports = [self.tx_port, self.rx_port])
# start a capture
txc = self.c.start_capture(tx_ports = self.tx_port, limit = pkt_count)
rxc = self.c.start_capture(rx_ports = self.rx_port, limit = pkt_count)
# inject few packets with a VM
vm = STLScVmRaw( [STLVmFlowVar ( "ip_src", min_value="16.0.0.0", max_value="16.255.255.255", size=4, step = 7, op = "inc"),
STLVmWrFlowVar (fv_name="ip_src", pkt_offset= "IP.src"),
STLVmFixIpv4(offset = "IP")
]
);
pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example',
vm = vm)
stream = STLStream(name = 'burst',
packet = pkt,
mode = STLTXSingleBurst(total_pkts = pkt_count,
percentage = self.percentage)
)
self.c.add_streams(ports = self.tx_port, streams = [stream])
self.c.start(ports = self.tx_port, force = True)
self.c.wait_on_traffic(ports = self.tx_port)
tx_pkt_list = []
rx_pkt_list = []
self.c.stop_capture(txc['id'], output = tx_pkt_list)
self.c.stop_capture(rxc['id'], output = rx_pkt_list)
assert (len(tx_pkt_list) == len(rx_pkt_list) == pkt_count)
# make sure we have the same binaries in both lists
self.__compare_captures(tx_pkt_list, rx_pkt_list)
# generate all the values that should be
expected_src_ips = {ip_add('16.0.0.0', i * 7) for i in range(pkt_count)}
got_src_ips = {(Ether(pkt['binary']))['IP'].src for pkt in rx_pkt_list}
if expected_src_ips != got_src_ips:
assert 0, "recieved packets do not match expected packets"
except STLError as e:
assert False , '{0}'.format(e)
finally:
self.c.remove_all_captures()
self.c.set_service_mode(ports = self.rx_port, enabled = False)
# in this test we apply captures under traffic multiple times
def test_stress_capture (self):
pkt_count = 100
try:
# move to service mode
self.c.set_service_mode(ports = self.rx_port)
# start heavy traffic
pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example')
stream = STLStream(name = 'burst',
packet = pkt,
mode = STLTXCont(percentage = self.percentage)
)
self.c.add_streams(ports = self.tx_port, streams = [stream])
self.c.start(ports = self.tx_port, force = True)
captures = [{'capture_id': None, 'limit': 50}, {'capture_id': None, 'limit': 80}, {'capture_id': None, 'limit': 100}]
for i in range(0, 100):
# start a few captures
for capture in captures:
capture['capture_id'] = self.c.start_capture(rx_ports = [self.rx_port], limit = capture['limit'])['id']
# a little time to wait for captures to be full
wait_iterations = 0
while True:
server_captures = self.c.get_capture_status()
counts = ([c['count'] for c in server_captures.values()])
if {50, 80, 100} == set(counts):
break
time.sleep(0.1)
wait_iterations += 1
assert(wait_iterations <= 5)
for capture in captures:
capture_id = capture['capture_id']
# make sure the server registers us and we are full
assert(capture['capture_id'] in server_captures.keys())
assert(server_captures[capture_id]['count'] == capture['limit'])
# fetch packets
pkt_list = []
self.c.stop_capture(capture['capture_id'], pkt_list)
assert (len(pkt_list) == capture['limit'])
# a little sanity per packet
for pkt in pkt_list:
scapy_pkt = Ether(pkt['binary'])
assert(scapy_pkt['IP'].src == '16.0.0.1')
assert(scapy_pkt['IP'].dst == '48.0.0.1')
except STLError as e:
assert False , '{0}'.format(e)
finally:
self.c.remove_all_captures()
self.c.set_service_mode(ports = self.rx_port, enabled = False)
# in this test we capture and analyze the ARP request / response
def test_arp_capture (self):
if self.c.get_port_attr(self.tx_port)['layer_mode'] != 'IPv4':
return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.tx_port))
if self.c.get_port_attr(self.rx_port)['layer_mode'] != 'IPv4':
return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.rx_port))
try:
# move to service mode
self.c.set_service_mode(ports = [self.tx_port, self.rx_port])
# start a capture
capture_info = self.c.start_capture(rx_ports = [self.tx_port, self.rx_port], limit = 2)
# generate an ARP request
self.c.arp(ports = self.tx_port)
pkts = []
self.c.stop_capture(capture_info['id'], output = pkts)
assert len(pkts) == 2
# find the correct order
if pkts[0]['port'] == self.rx_port:
request = pkts[0]
response = pkts[1]
else:
request = pkts[1]
response = pkts[0]
assert request['port'] == self.rx_port
assert response['port'] == self.tx_port
arp_request, arp_response = Ether(request['binary']), Ether(response['binary'])
assert 'ARP' in arp_request
assert 'ARP' in arp_response
assert arp_request['ARP'].op == 1
assert arp_response['ARP'].op == 2
assert arp_request['ARP'].pdst == arp_response['ARP'].psrc
except STLError as e:
assert False , '{0}'.format(e)
finally:
self.c.remove_all_captures()
self.c.set_service_mode(ports = [self.tx_port, self.rx_port], enabled = False)
# test PING
def test_ping_capture (self):
if self.c.get_port_attr(self.tx_port)['layer_mode'] != 'IPv4':
return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.tx_port))
if self.c.get_port_attr(self.rx_port)['layer_mode'] != 'IPv4':
return self.skip('skipping ARP capture test for non-ipv4 config on port {0}'.format(self.rx_port))
try:
# move to service mode
self.c.set_service_mode(ports = [self.tx_port, self.rx_port])
# start a capture
capture_info = self.c.start_capture(rx_ports = [self.tx_port, self.rx_port], limit = 100)
# generate an ARP request
tx_ipv4 = self.c.get_port_attr(port = self.tx_port)['src_ipv4']
rx_ipv4 = self.c.get_port_attr(port = self.rx_port)['src_ipv4']
count = 50
self.c.ping_ip(src_port = self.tx_port, dst_ip = rx_ipv4, pkt_size = 1500, count = count, interval_sec = 0.01)
pkts = []
self.c.stop_capture(capture_info['id'], output = pkts)
req_pkts = [Ether(pkt['binary']) for pkt in pkts if pkt['port'] == self.rx_port]
res_pkts = [Ether(pkt['binary']) for pkt in pkts if pkt['port'] == self.tx_port]
assert len(req_pkts) == count
assert len(res_pkts) == count
for req_pkt in req_pkts:
assert 'ICMP' in req_pkt
assert req_pkt['IP'].src == tx_ipv4
assert req_pkt['IP'].dst == rx_ipv4
assert req_pkt['ICMP'].type == 8
assert len(req_pkt) == 1500
for res_pkt in res_pkts:
assert 'ICMP' in res_pkt
assert res_pkt['IP'].src == rx_ipv4
assert res_pkt['IP'].dst == tx_ipv4
assert res_pkt['ICMP'].type == 0
assert len(res_pkt) == 1500
except STLError as e:
assert False , '{0}'.format(e)
finally:
self.c.remove_all_captures()
self.c.set_service_mode(ports = [self.tx_port, self.rx_port], enabled = False)
# in this test we stress TX & RX captures in parallel
def test_stress_tx_rx (self):
pkt_count = 100
try:
# move to service mode
self.c.set_service_mode(ports = [self.rx_port, self.tx_port])
# start heavy traffic
pkt = STLPktBuilder(pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/IP()/'a_payload_example')
stream = STLStream(name = 'burst',
packet = pkt,
mode = STLTXCont(percentage = self.percentage)
)
self.c.add_streams(ports = self.tx_port, streams = [stream])
self.c.start(ports = self.tx_port, mult = "50%", force = True)
# start a capture on the RX port
capture_rx = self.c.start_capture(rx_ports = self.rx_port, limit = 1000)
# now under traffic start/stop the TX capture
for i in range(0, 1000):
# start a common capture
capture_txrx = self.c.start_capture(rx_ports = self.rx_port, tx_ports = self.tx_port, limit = 1000)
self.c.stop_capture(capture_txrx['id'])
except STLError as e:
assert False , '{0}'.format(e)
finally:
self.c.remove_all_captures()
self.c.set_service_mode(ports = [self.rx_port, self.tx_port], enabled = False)