aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/pipeline.h
blob: ec2ac0b96f3ff78f110dd619fe1c8d68e693eead (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
/*
 * vnet/pipeline.h: software pipeline
 *
 * Copyright (c) 2012 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/*
 * Usage example.
 *
 * #define NSTAGES 3 or whatever
 *
 * If using an aux data vector - to hold bihash keys or some such:
 *
 * #define AUX_DATA_TYPE my_aux_data_t
 *
 * <Define pipeline stages>
 *
 * #include <vnet/pipeline.h>
 *
 * static uword my_node_fn (vlib_main_t * vm,
 *                               vlib_node_runtime_t * node,
 *                               vlib_frame_t * frame)
 * {
 *     return dispatch_pipeline (vm, node, frame);
 * }
 *
 */

#ifndef NSTAGES
#error files which #include <vnet/pipeline.h> must define NSTAGES
#endif

#ifndef STAGE_INLINE
#define STAGE_INLINE inline
#endif

/* Unless the user wants the aux data scheme, don't configure it */
#ifndef AUX_DATA_TYPE
#define AUX_DATA_ARG
#define AUX_DATA_DECL
#define AUX_DATA_PTR(pi)
#else
#define AUX_DATA_ARG ,##AUX_DATA_TYPE *ap
#define AUX_DATA_DECL AUX_DATA_TYPE aux_data[VLIB_FRAME_SIZE]
#define AUX_DATA_PTR(pi) ,aux_data +(pi)
#endif

/*
 * A prefetch stride of 2 is quasi-equivalent to doubling the number
 * of stages with every other pipeline stage empty.
 */

/*
 * This is a typical first pipeline stage, which prefetches
 * buffer metadata and the first line of pkt data.
 *
 * To use it:
 *  #define stage0 generic_stage0
 *
 * This implementation won't use the aux data argument
 */
static STAGE_INLINE void
generic_stage0 (vlib_main_t * vm,
		vlib_node_runtime_t * node, vlib_buffer_t * b AUX_DATA_ARG)
{
  vlib_prefetch_buffer_header (b, STORE);
  CLIB_PREFETCH (b->data, CLIB_CACHE_LINE_BYTES, STORE);
}

#if NSTAGES == 2

static STAGE_INLINE uword
dispatch_pipeline (vlib_main_t * vm,
		   vlib_node_runtime_t * node, vlib_frame_t * frame)
{
  u32 *from;
  u32 n_left_from;
  int pi;
  vlib_buffer_t *bufs[VLIB_FRAME_SIZE];
  u16 nexts[VLIB_FRAME_SIZE];
  AUX_DATA_DECL;

  n_left_from = frame->n_vectors;
  from = vlib_frame_args (frame);
  vlib_get_buffers (vm, from, bufs, n_left_from);

  for (pi = 0; pi < NSTAGES - 1; pi++)
    {
      if (pi == n_left_from)
	break;
      stage0 (vm, node, bufs[pi] AUX_DATA_PTR (pi));
    }

  for (; pi < n_left_from; pi++)
    {
      stage0 (vm, node, bufs[pi]);
      nexts[pi - 1] =
	last_stage (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
    }

  for (; pi < (n_left_from + (NSTAGES - 1)); pi++)
    {
      if (((pi - 1) >= 0) && ((pi - 1) < n_left_from))
	nexts[pi - 1] =
	  last_stage (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
    }

  vlib_buffer_enqueue_to_next (vm, node, from, nexts, frame->n_vectors);
  return frame->n_vectors;
}
#endif

#if NSTAGES == 3
static STAGE_INLINE uword
dispatch_pipeline (vlib_main_t * vm,
		   vlib_node_runtime_t * node, vlib_frame_t * frame)
{
  u32 *from;
  u32 n_left_from;
  int pi;
  vlib_buffer_t *bufs[VLIB_FRAME_SIZE];
  u16 nexts[VLIB_FRAME_SIZE];
  AUX_DATA_DECL;

  n_left_from = frame->n_vectors;
  from = vlib_frame_args (frame);
  vlib_get_buffers (vm, from, bufs, n_left_from);

  for (pi = 0; pi < NSTAGES - 1; pi++)
    {
      if (pi == n_left_from)
	break;
      stage0 (vm, node, bufs[pi] AUX_DATA_PTR (pi));
      if (pi - 1 >= 0)
	stage1 (vm, node, bufs[pi - 1]);
    }

  for (; pi < n_left_from; pi++)
    {
      stage0 (vm, node, bufs[pi] AUX_DATA_PTR (pi));
      stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      nexts[pi - 2] =
	last_stage (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
    }

  for (; pi < (n_left_from + (NSTAGES - 1)); pi++)
    {
      if (((pi - 1) >= 0) && ((pi - 1) < n_left_from))
	stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      if (((pi - 2) >= 0) && ((pi - 2) < n_left_from))
	nexts[pi - 2] =
	  last_stage (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
    }

  vlib_buffer_enqueue_to_next (vm, node, from, nexts, frame->n_vectors);
  return frame->n_vectors;
}
#endif

#if NSTAGES == 4
static STAGE_INLINE uword
dispatch_pipeline (vlib_main_t * vm,
		   vlib_node_runtime_t * node, vlib_frame_t * frame)
{
  u32 *from;
  u32 n_left_from;
  int pi;
  vlib_buffer_t *bufs[VLIB_FRAME_SIZE];
  u16 nexts[VLIB_FRAME_SIZE];
  AUX_DATA_DECL;

  n_left_from = frame->n_vectors;
  from = vlib_frame_args (frame);
  vlib_get_buffers (vm, from, bufs, n_left_from);

  for (pi = 0; pi < NSTAGES - 1; pi++)
    {
      if (pi == n_left_from)
	break;
      stage0 (vm, node, bufs[pi] AUX_DATA_PTR (pi));
      if (pi - 1 >= 0)
	stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      if (pi - 2 >= 0)
	stage2 (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
    }

  for (; pi < n_left_from; pi++)
    {
      stage0 (vm, node, bufs[pi] AUX_DATA_PTR (pi));
      stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      stage2 (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
      nexts[pi - 3] =
	last_stage (vm, node, bufs[pi - 3] AUX_DATA_PTR (pi - 3));
    }

  for (; pi < (n_left_from + (NSTAGES - 1)); pi++)
    {
      if (((pi - 1) >= 0) && ((pi - 1) < n_left_from))
	stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      if (((pi - 2) >= 0) && ((pi - 2) < n_left_from))
	stage2 (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
      if (((pi - 3) >= 0) && ((pi - 3) < n_left_from))
	nexts[pi - 3] =
	  last_stage (vm, node, bufs[pi - 3] AUX_DATA_PTR (pi - 3));
    }

  vlib_buffer_enqueue_to_next (vm, node, from, nexts, frame->n_vectors);
  return frame->n_vectors;
}
#endif

#if NSTAGES == 5
static STAGE_INLINE uword
dispatch_pipeline (vlib_main_t * vm,
		   vlib_node_runtime_t * node, vlib_frame_t * frame)
{
  u32 *from;
  u32 n_left_from;
  int pi;
  vlib_buffer_t *bufs[VLIB_FRAME_SIZE];
  u16 nexts[VLIB_FRAME_SIZE];
  AUX_DATA_DECL;

  n_left_from = frame->n_vectors;
  from = vlib_frame_args (frame);
  vlib_get_buffers (vm, from, bufs, n_left_from);

  for (pi = 0; pi < NSTAGES - 1; pi++)
    {
      if (pi == n_left_from)
	break;
      stage0 (vm, node, bufs[pi] AUX_DATA_PTR (pi));
      if (pi - 1 >= 0)
	stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      if (pi - 2 >= 0)
	stage2 (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
      if (pi - 3 >= 0)
	stage3 (vm, node, bufs[pi - 3] AUX_DATA_PTR (pi - 3));
    }

  for (; pi < n_left_from; pi++)
    {
      stage0 (vm, node, bufs[pi] AUX_DATA_PTR (pi));
      stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      stage2 (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
      stage3 (vm, node, bufs[pi - 3] AUX_DATA_PTR (pi - 3));
      nexts[pi - 4] =
	last_stage (vm, node, bufs[pi - 4] AUX_DATA_PTR (pi - 4));
    }

  for (; pi < (n_left_from + (NSTAGES - 1)); pi++)
    {
      if (((pi - 1) >= 0) && ((pi - 1) < n_left_from))
	stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      if (((pi - 2) >= 0) && ((pi - 2) < n_left_from))
	stage2 (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
      if (((pi - 3) >= 0) && ((pi - 3) < n_left_from))
	stage3 (vm, node, bufs[pi - 3] AUX_DATA_PTR (pi - 3));
      if (((pi - 4) >= 0) && ((pi - 4) < n_left_from))
	nexts[pi - 4] =
	  last_stage (vm, node, bufs[pi - 4] AUX_DATA_PTR (pi - 4));
    }

  vlib_buffer_enqueue_to_next (vm, node, from, nexts, frame->n_vectors);
  return frame->n_vectors;
}
#endif

#if NSTAGES == 6
static STAGE_INLINE uword
dispatch_pipeline (vlib_main_t * vm,
		   vlib_node_runtime_t * node, vlib_frame_t * frame)
{
  u32 *from;
  u32 n_left_from;
  int pi;
  vlib_buffer_t *bufs[VLIB_FRAME_SIZE];
  u16 nexts[VLIB_FRAME_SIZE];
  AUX_DATA_DECL;

  n_left_from = frame->n_vectors;
  from = vlib_frame_args (frame);
  vlib_get_buffers (vm, from, bufs, n_left_from);

  for (pi = 0; pi < NSTAGES - 1; pi++)
    {
      if (pi == n_left_from)
	break;
      stage0 (vm, node, bufs[pi] AUX_DATA_PTR (pi));
      if (pi - 1 >= 0)
	stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      if (pi - 2 >= 0)
	stage2 (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
      if (pi - 3 >= 0)
	stage3 (vm, node, bufs[pi - 3] AUX_DATA_PTR (pi - 3));
      if (pi - 4 >= 0)
	stage4 (vm, node, bufs[pi - 4] AUX_DATA_PTR (pi - 4));
    }

  for (; pi < n_left_from; pi++)
    {
      stage0 (vm, node, bufs[pi] AUX_DATA_PTR (pi));
      stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      stage2 (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
      stage3 (vm, node, bufs[pi - 3] AUX_DATA_PTR (pi - 3));
      stage4 (vm, node, bufs[pi - 4] AUX_DATA_PTR (pi - 4));
      nexts[pi - 5] =
	last_stage (vm, node, bufs[pi - 5] AUX_DATA_PTR (pi - 5));
    }

  for (; pi < (n_left_from + (NSTAGES - 1)); pi++)
    {
      if (((pi - 1) >= 0) && ((pi - 1) < n_left_from))
	stage1 (vm, node, bufs[pi - 1] AUX_DATA_PTR (pi - 1));
      if (((pi - 2) >= 0) && ((pi - 2) < n_left_from))
	stage2 (vm, node, bufs[pi - 2] AUX_DATA_PTR (pi - 2));
      if (((pi - 3) >= 0) && ((pi - 3) < n_left_from))
	stage3 (vm, node, bufs[pi - 3] AUX_DATA_PTR (pi - 3));
      if (((pi - 4) >= 0) && ((pi - 4) < n_left_from))
	stage4 (vm, node, bufs[pi - 4] AUX_DATA_PTR (pi - 4));
      if (((pi - 5) >= 0) && ((pi - 5) < n_left_from))
	nexts[pi - 5] =
	  last_stage (vm, node, bufs[pi - 5] AUX_DATA_PTR (pi - 5));
    }

  vlib_buffer_enqueue_to_next (vm, node, from, nexts, frame->n_vectors);
  return frame->n_vectors;
}
#endif

/*
 * fd.io coding-style-patch-verification: ON
 *
 * Local Variables:
 * eval: (c-set-style "gnu")
 * End:
 */
) p /= UDP(sport=1234, dport=4321) p /= Raw(payload) info.data = p.copy() self.extend_packet(p, pkt_size) self.pkts.append(p) def verify_cflow_data(self, decoder, capture, cflow): octets = 0 packets = 0 for p in capture: octets += p[IP].len packets += 1 if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) for record in data: self.assertEqual(int(binascii.hexlify(record[1]), 16), octets) self.assertEqual(int(binascii.hexlify(record[2]), 16), packets) def send_packets(self, src_if=None, dst_if=None): if src_if is None: src_if = self.pg1 if dst_if is None: dst_if = self.pg2 self.pg_enable_capture([dst_if]) src_if.add_stream(self.pkts) self.pg_start() return dst_if.get_capture(len(self.pkts)) def verify_cflow_data_detail(self, decoder, capture, cflow, data_set={1: 'octets', 2: 'packets'}, ip_ver='v4'): if self.debug_print: print(capture[0].show()) if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) if self.debug_print: print(data) if ip_ver == 'v4': ip_layer = capture[0][IP] else: ip_layer = capture[0][IPv6] if data_set is not None: for record in data: # skip flow if ingress/egress interface is 0 if int(binascii.hexlify(record[10]), 16) == 0: continue if int(binascii.hexlify(record[14]), 16) == 0: continue for field in data_set: if field not in record.keys(): continue value = data_set[field] if value == 'octets': value = ip_layer.len if ip_ver == 'v6': value += 40 # ??? is this correct elif value == 'packets': value = 1 elif value == 'src_ip': if ip_ver == 'v4': ip = socket.inet_pton(socket.AF_INET, ip_layer.src) else: ip = socket.inet_pton(socket.AF_INET6, ip_layer.src) value = int(binascii.hexlify(ip), 16) elif value == 'dst_ip': if ip_ver == 'v4': ip = socket.inet_pton(socket.AF_INET, ip_layer.dst) else: ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst) value = int(binascii.hexlify(ip), 16) elif value == 'sport': value = int(capture[0][UDP].sport) elif value == 'dport': value = int(capture[0][UDP].dport) self.assertEqual(int(binascii.hexlify( record[field]), 16), value) def verify_cflow_data_notimer(self, decoder, capture, cflows): idx = 0 for cflow in cflows: if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) else: raise Exception("No CFLOW data") for rec in data: p = capture[idx] idx += 1 self.assertEqual(p[IP].len, int( binascii.hexlify(rec[1]), 16)) self.assertEqual(1, int( binascii.hexlify(rec[2]), 16)) self.assertEqual(len(capture), idx) def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1, expected=True): """ wait for CFLOW packet and verify its correctness :param timeout: how long to wait :returns: tuple (packet, time spent waiting for packet) """ self.logger.info("IPFIX: Waiting for CFLOW packet") deadline = time.time() + timeout counter = 0 # self.logger.debug(self.vapi.ppcli("show flow table")) while True: counter += 1 # sanity check self.assert_in_range(counter, 0, 100, "number of packets ignored") time_left = deadline - time.time() try: if time_left < 0 and expected: # self.logger.debug(self.vapi.ppcli("show flow table")) raise CaptureTimeoutError( "Packet did not arrive within timeout") p = collector_intf.wait_for_packet(timeout=time_left) except CaptureTimeoutError: if expected: # self.logger.debug(self.vapi.ppcli("show flow table")) raise CaptureTimeoutError( "Packet did not arrive within timeout") else: return if not expected: raise CaptureTimeoutError("Packet arrived even not expected") self.assertEqual(p[Set].setID, set_id) # self.logger.debug(self.vapi.ppcli("show flow table")) self.logger.debug(ppp("IPFIX: Got packet:", p)) break return p class Flowprobe(MethodHolder): """Template verification, timer tests""" @classmethod def setUpClass(cls): super(Flowprobe, cls).setUpClass() @classmethod def tearDownClass(cls): super(Flowprobe, cls).tearDownClass() def test_0001(self): """ timer less than template timeout""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, active=2) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder) self.create_stream(packets=1) self.send_packets() capture = self.pg2.get_capture(1) # make sure the one packet we expect actually showed up cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15) self.verify_cflow_data(ipfix_decoder, capture, cflow) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") def test_0002(self): """ timer greater than template timeout""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, timeout=3, active=4) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately ipfix.verify_templates() self.create_stream(packets=2) self.send_packets() capture = self.pg2.get_capture(2) # next set of template packet should arrive after 20 seconds # template packet should arrive within 20 s templates = ipfix.verify_templates(ipfix_decoder, timeout=5) # make sure the one packet we expect actually showed up cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15) self.verify_cflow_data(ipfix_decoder, capture, cflow) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") def test_cflow_packet(self): """verify cflow packet fields""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, intf='pg8', datapath="ip4", layer='l2 l3 l4', active=2) ipfix.add_vpp_config() route_9001 = VppIpRoute(self, "9.0.0.0", 24, [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)]) route_9001.add_vpp_config() ipfix_decoder = IPFIXDecoder() templates = ipfix.verify_templates(ipfix_decoder, count=1) self.pkts = [(Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac) / IP(src=self.pg7.remote_ip4, dst="9.0.0.100") / TCP(sport=1234, dport=4321, flags=80) / Raw(b'\xa5' * 100))] nowUTC = int(time.time()) nowUNIX = nowUTC+2208988800 self.send_packets(src_if=self.pg7, dst_if=self.pg8) cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10) self.collector.get_capture(2) if cflow[0].haslayer(IPFIX): self.assertEqual(cflow[IPFIX].version, 10) self.assertEqual(cflow[IPFIX].observationDomainID, 1) self.assertEqual(cflow[IPFIX].sequenceNumber, 0) self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5) if cflow.haslayer(Data): record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0] # ingress interface self.assertEqual(int(binascii.hexlify(record[10]), 16), 8) # egress interface self.assertEqual(int(binascii.hexlify(record[14]), 16), 9) # packets self.assertEqual(int(binascii.hexlify(record[2]), 16), 1) # src mac self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac) # dst mac self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac) flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32 # flow start timestamp self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1) flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32 # flow end timestamp self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1) # ethernet type self.assertEqual(int(binascii.hexlify(record[256]), 16), 8) # src ip self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4) # dst ip self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100") # protocol (TCP) self.assertEqual(int(binascii.hexlify(record[4]), 16), 6) # src port self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234) # dst port self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321) # tcp flags self.assertEqual(int(binascii.hexlify(record[6]), 16), 80) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") class Datapath(MethodHolder): """collect information on Ethernet, IP4 and IP6 datapath (no timers)""" @classmethod def setUpClass(cls): super(Datapath, cls).setUpClass() @classmethod def tearDownClass(cls): super(Datapath, cls).tearDownClass() def test_templatesL2(self): """ verify template on L2 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) ipfix = VppCFLOW(test=self, layer='l2') ipfix.add_vpp_config() # template packet should arrive immediately self.vapi.ipfix_flush() ipfix.verify_templates(timeout=3, count=1) self.collector.get_capture(1) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") def test_L2onL2(self): """ L2 data on L2 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, layer='l2') ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(packets=1) capture = self.send_packets() # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, {2: 'packets', 256: 8}) self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") def test_L3onL2(self): """ L3 data on L2 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, layer='l3') ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=2) self.create_stream(packets=1) capture = self.send_packets() # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, {2: 'packets', 4: 17, 8: 'src_ip', 12: 'dst_ip'}) self.collector.get_capture(3) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") def test_L4onL2(self): """ L4 data on L2 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, layer='l4') ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=2) self.create_stream(packets=1) capture = self.send_packets() # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, {2: 'packets', 7: 'sport', 11: 'dport'}) self.collector.get_capture(3) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0003") def test_templatesIp4(self): """ verify templates on IP4 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) ipfix = VppCFLOW(test=self, datapath='ip4') ipfix.add_vpp_config() # template packet should arrive immediately self.vapi.ipfix_flush() ipfix.verify_templates(timeout=3, count=1) self.collector.get_capture(1) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") def test_L2onIP4(self): """ L2 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, intf='pg4', layer='l2', datapath='ip4') ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1) capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, {2: 'packets', 256: 8}) # expected two templates and one cflow packet self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") def test_L3onIP4(self): """ L3 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, intf='pg4', layer='l3', datapath='ip4') ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1) capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, {1: 'octets', 2: 'packets', 8: 'src_ip', 12: 'dst_ip'}) # expected two templates and one cflow packet self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") def test_L4onIP4(self): """ L4 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, intf='pg4', layer='l4', datapath='ip4') ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1) capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, {2: 'packets', 7: 'sport', 11: 'dport'}) # expected two templates and one cflow packet self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0003") def test_templatesIP6(self): """ verify templates on IP6 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) ipfix = VppCFLOW(test=self, datapath='ip6') ipfix.add_vpp_config() # template packet should arrive immediately ipfix.verify_templates(count=1) self.collector.get_capture(1) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") def test_L2onIP6(self): """ L2 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, intf='pg6', layer='l2', datapath='ip6') ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver='IPv6') capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, {2: 'packets', 256: 56710}, ip_ver='v6') # expected two templates and one cflow packet self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") def test_L3onIP6(self): """ L3 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, intf='pg6', layer='l3', datapath='ip6') ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver='IPv6') capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, {2: 'packets', 27: 'src_ip', 28: 'dst_ip'}, ip_ver='v6') # expected two templates and one cflow packet self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") def test_L4onIP6(self): """ L4 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, intf='pg6', layer='l4', datapath='ip6') ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver='IPv6') capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, {2: 'packets', 7: 'sport', 11: 'dport'}, ip_ver='v6') # expected two templates and one cflow packet self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0003") def test_0001(self): """ no timers, one CFLOW packet, 9 Flows inside""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder) self.create_stream(packets=9) capture = self.send_packets() # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[1]) self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow]) self.collector.get_capture(4) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") def test_0002(self): """ no timers, two CFLOW packets (mtu=256), 3 Flows in each""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self, mtu=256) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately self.vapi.ipfix_flush() templates = ipfix.verify_templates(ipfix_decoder) self.create_stream(packets=6) capture = self.send_packets() # make sure the one packet we expect actually showed up cflows = [] self.vapi.ipfix_flush() cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows) self.collector.get_capture(5) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") @unittest.skipUnless(running_extended_tests, "part of extended tests") class DisableIPFIX(MethodHolder): """Disable IPFIX""" @classmethod def setUpClass(cls): super(DisableIPFIX, cls).setUpClass() @classmethod def tearDownClass(cls): super(DisableIPFIX, cls).tearDownClass() def test_0001(self): """ disable IPFIX after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder) self.create_stream() self.send_packets() # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1]) self.collector.get_capture(4) # disable IPFIX ipfix.disable_exporter() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in 1 minute self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1], expected=False) self.collector.get_capture(0) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") @unittest.skipUnless(running_extended_tests, "part of extended tests") class ReenableIPFIX(MethodHolder): """Re-enable IPFIX""" @classmethod def setUpClass(cls): super(ReenableIPFIX, cls).setUpClass() @classmethod def tearDownClass(cls): super(ReenableIPFIX, cls).tearDownClass() def test_0011(self): """ disable IPFIX after first packets and re-enable after few packets """ self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder) self.create_stream(packets=5) self.send_packets() # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1]) self.collector.get_capture(4) # disable IPFIX ipfix.disable_exporter() self.vapi.ipfix_flush() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in active timer span self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1], expected=False) self.collector.get_capture(0) self.pg2.get_capture(5) # enable IPFIX ipfix.enable_exporter() capture = self.collector.get_capture(4) nr_templates = 0 nr_data = 0 for p in capture: self.assertTrue(p.haslayer(IPFIX)) if p.haslayer(Template): nr_templates += 1 self.assertTrue(nr_templates, 3) for p in capture: self.assertTrue(p.haslayer(IPFIX)) if p.haslayer(Data): nr_data += 1 self.assertTrue(nr_templates, 1) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") @unittest.skipUnless(running_extended_tests, "part of extended tests") class DisableFP(MethodHolder): """Disable Flowprobe feature""" @classmethod def setUpClass(cls): super(DisableFP, cls).setUpClass() @classmethod def tearDownClass(cls): super(DisableFP, cls).tearDownClass() def test_0001(self): """ disable flowprobe feature after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder) self.create_stream() self.send_packets() # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1]) self.collector.get_capture(4) # disable IPFIX ipfix.disable_flowprobe_feature() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in active timer span self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1], expected=False) self.collector.get_capture(0) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") @unittest.skipUnless(running_extended_tests, "part of extended tests") class ReenableFP(MethodHolder): """Re-enable Flowprobe feature""" @classmethod def setUpClass(cls): super(ReenableFP, cls).setUpClass() @classmethod def tearDownClass(cls): super(ReenableFP, cls).tearDownClass() def test_0001(self): """ disable flowprobe feature after first packets and re-enable after few packets """ self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] ipfix = VppCFLOW(test=self) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately self.vapi.ipfix_flush() templates = ipfix.verify_templates(ipfix_decoder, timeout=3) self.create_stream() self.send_packets() # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1], 5) self.collector.get_capture(4) # disable FPP feature ipfix.disable_flowprobe_feature() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in active timer span self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1], 5, expected=False) self.collector.get_capture(0) # enable FPP feature ipfix.enable_flowprobe_feature() self.vapi.ipfix_flush() templates = ipfix.verify_templates(ipfix_decoder, timeout=3) self.send_packets() # make sure the next packets (templates and data) we expect actually # showed up self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1], 5) self.collector.get_capture(4) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)