1 | |
---|
2 | #include "config.h" |
---|
3 | #include "common.h" |
---|
4 | #include "libtrace.h" |
---|
5 | #include "libtrace_int.h" |
---|
6 | #include "format_helper.h" |
---|
7 | #include "format_erf.h" |
---|
8 | |
---|
9 | #include <assert.h> |
---|
10 | #include <errno.h> |
---|
11 | #include <fcntl.h> |
---|
12 | #include <stdio.h> |
---|
13 | #include <string.h> |
---|
14 | #include <unistd.h> |
---|
15 | #include <stdlib.h> |
---|
16 | #include <sys/types.h> |
---|
17 | #include <sys/socket.h> |
---|
18 | #include <netdb.h> |
---|
19 | |
---|
20 | #include "format_dpdk.h" |
---|
21 | #include "format_ndag.h" |
---|
22 | |
---|
23 | static struct libtrace_format_t dpdkndag; |
---|
24 | |
---|
25 | typedef struct capstream { |
---|
26 | |
---|
27 | uint16_t port; |
---|
28 | uint32_t expectedseq; |
---|
29 | uint64_t recordcount; |
---|
30 | } capstream_t; |
---|
31 | |
---|
32 | typedef struct perthread { |
---|
33 | capstream_t *capstreams; |
---|
34 | uint16_t streamcount; |
---|
35 | uint64_t dropped_upstream; |
---|
36 | uint64_t missing_records; |
---|
37 | uint64_t received_packets; |
---|
38 | |
---|
39 | libtrace_packet_t *dpdkpkt; |
---|
40 | char *ndagheader; |
---|
41 | char *nextrec; |
---|
42 | uint32_t ndagsize; |
---|
43 | |
---|
44 | pthread_mutex_t ndag_lock; |
---|
45 | dpdk_per_stream_t *dpdkstreamdata; |
---|
46 | int burstsize; |
---|
47 | int burstoffset; |
---|
48 | struct rte_mbuf* burstspace[40]; |
---|
49 | |
---|
50 | } perthread_t; |
---|
51 | |
---|
52 | |
---|
53 | typedef struct dpdkndag_format_data { |
---|
54 | libtrace_t *dpdkrecv; |
---|
55 | |
---|
56 | struct addrinfo *multicastgroup; |
---|
57 | char *localiface; |
---|
58 | |
---|
59 | perthread_t *threaddatas; |
---|
60 | |
---|
61 | } dpdkndag_format_data_t; |
---|
62 | |
---|
63 | #define FORMAT_DATA ((dpdkndag_format_data_t *)libtrace->format_data) |
---|
64 | |
---|
65 | static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) { |
---|
66 | |
---|
67 | /* Calculate seq_a - seq_b, taking wraparound into account */ |
---|
68 | if (seq_a == seq_b) return 0; |
---|
69 | |
---|
70 | if (seq_a > seq_b) { |
---|
71 | return (int) (seq_a - seq_b); |
---|
72 | } |
---|
73 | |
---|
74 | /* -1 for the wrap and another -1 because we don't use zero */ |
---|
75 | return (int) (0xffffffff - ((seq_b - seq_a) - 2)); |
---|
76 | } |
---|
77 | |
---|
78 | |
---|
79 | static int dpdkndag_init_input(libtrace_t *libtrace) { |
---|
80 | |
---|
81 | char *scan = NULL; |
---|
82 | char *next = NULL; |
---|
83 | char dpdkuri[1280]; |
---|
84 | struct addrinfo hints, *result; |
---|
85 | |
---|
86 | libtrace->format_data = (dpdkndag_format_data_t *)malloc( |
---|
87 | sizeof(dpdkndag_format_data_t)); |
---|
88 | |
---|
89 | FORMAT_DATA->localiface = NULL; |
---|
90 | FORMAT_DATA->threaddatas = NULL; |
---|
91 | FORMAT_DATA->dpdkrecv = NULL; |
---|
92 | |
---|
93 | scan = strchr(libtrace->uridata, ','); |
---|
94 | if (scan == NULL) { |
---|
95 | trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, |
---|
96 | "Bad dpdkndag URI. Should be dpdkndag:<interface>,<multicast group>"); |
---|
97 | return -1; |
---|
98 | } |
---|
99 | FORMAT_DATA->localiface = strndup(libtrace->uridata, |
---|
100 | (size_t)(scan - libtrace->uridata)); |
---|
101 | next = scan + 1; |
---|
102 | |
---|
103 | memset(&hints, 0, sizeof(struct addrinfo)); |
---|
104 | hints.ai_family = AF_UNSPEC; |
---|
105 | hints.ai_socktype = SOCK_DGRAM; |
---|
106 | hints.ai_flags = AI_PASSIVE; |
---|
107 | hints.ai_protocol = 0; |
---|
108 | |
---|
109 | if (getaddrinfo(next, NULL, &hints, &result) != 0) { |
---|
110 | perror("getaddrinfo"); |
---|
111 | trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, |
---|
112 | "Invalid multicast address: %s", next); |
---|
113 | return -1; |
---|
114 | } |
---|
115 | |
---|
116 | FORMAT_DATA->multicastgroup = result; |
---|
117 | |
---|
118 | snprintf(dpdkuri, 1279, "dpdk:%s", FORMAT_DATA->localiface); |
---|
119 | FORMAT_DATA->dpdkrecv = trace_create(dpdkuri); |
---|
120 | |
---|
121 | if (trace_is_err(FORMAT_DATA->dpdkrecv)) { |
---|
122 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
123 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
124 | free(libtrace->format_data); |
---|
125 | libtrace->format_data = NULL; |
---|
126 | return -1; |
---|
127 | } |
---|
128 | |
---|
129 | return 0; |
---|
130 | } |
---|
131 | |
---|
132 | static int dpdkndag_config_input (libtrace_t *libtrace, trace_option_t option, |
---|
133 | void *data) { |
---|
134 | |
---|
135 | return dpdk_config_input(FORMAT_DATA->dpdkrecv, option, data); |
---|
136 | } |
---|
137 | |
---|
138 | static int dpdkndag_init_threads(libtrace_t *libtrace, uint32_t maxthreads) { |
---|
139 | |
---|
140 | uint32_t i; |
---|
141 | if (FORMAT_DATA->threaddatas == NULL) { |
---|
142 | FORMAT_DATA->threaddatas = (perthread_t *)malloc( |
---|
143 | sizeof(perthread_t) * maxthreads); |
---|
144 | } |
---|
145 | |
---|
146 | for (i = 0; i < maxthreads; i++) { |
---|
147 | FORMAT_DATA->threaddatas[i].capstreams = NULL; |
---|
148 | FORMAT_DATA->threaddatas[i].streamcount = 0; |
---|
149 | FORMAT_DATA->threaddatas[i].dropped_upstream = 0; |
---|
150 | FORMAT_DATA->threaddatas[i].received_packets = 0; |
---|
151 | FORMAT_DATA->threaddatas[i].missing_records = 0; |
---|
152 | FORMAT_DATA->threaddatas[i].dpdkstreamdata = NULL; |
---|
153 | FORMAT_DATA->threaddatas[i].dpdkpkt = trace_create_packet(); |
---|
154 | FORMAT_DATA->threaddatas[i].ndagheader = NULL; |
---|
155 | FORMAT_DATA->threaddatas[i].nextrec = NULL; |
---|
156 | FORMAT_DATA->threaddatas[i].burstsize = 0; |
---|
157 | FORMAT_DATA->threaddatas[i].burstoffset = 0; |
---|
158 | memset(FORMAT_DATA->threaddatas[i].burstspace, 0, |
---|
159 | sizeof(struct rte_mbuf *) * 40); |
---|
160 | pthread_mutex_init(&(FORMAT_DATA->threaddatas[i].ndag_lock), |
---|
161 | NULL); |
---|
162 | } |
---|
163 | return maxthreads; |
---|
164 | } |
---|
165 | |
---|
166 | static int dpdkndag_start_input(libtrace_t *libtrace) { |
---|
167 | enum hasher_types hash = HASHER_UNIDIRECTIONAL; |
---|
168 | int snaplen = 9000; |
---|
169 | |
---|
170 | if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER, |
---|
171 | &hash) == -1) { |
---|
172 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
173 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
174 | return -1; |
---|
175 | } |
---|
176 | |
---|
177 | if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN, |
---|
178 | &snaplen) == -1) { |
---|
179 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
180 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
181 | return -1; |
---|
182 | } |
---|
183 | |
---|
184 | if (dpdk_start_input(FORMAT_DATA->dpdkrecv) == -1) { |
---|
185 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
186 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
187 | return -1; |
---|
188 | } |
---|
189 | |
---|
190 | dpdkndag_init_threads(libtrace, 1); |
---|
191 | |
---|
192 | return 0; |
---|
193 | } |
---|
194 | |
---|
195 | static int dpdkndag_pstart_input(libtrace_t *libtrace) { |
---|
196 | |
---|
197 | enum hasher_types hash = HASHER_UNIDIRECTIONAL; |
---|
198 | int snaplen = 9000; |
---|
199 | FORMAT_DATA->dpdkrecv->perpkt_thread_count = libtrace->perpkt_thread_count; |
---|
200 | if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER, |
---|
201 | &hash) == -1) { |
---|
202 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
203 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
204 | return -1; |
---|
205 | } |
---|
206 | |
---|
207 | if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN, |
---|
208 | &snaplen) == -1) { |
---|
209 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
210 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
211 | return -1; |
---|
212 | } |
---|
213 | if (dpdk_pstart_input(FORMAT_DATA->dpdkrecv) == -1) { |
---|
214 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
215 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
216 | return -1; |
---|
217 | } |
---|
218 | dpdkndag_init_threads(libtrace, libtrace->perpkt_thread_count); |
---|
219 | return 0; |
---|
220 | } |
---|
221 | |
---|
222 | static void clear_threaddata(perthread_t *pt) { |
---|
223 | |
---|
224 | int i; |
---|
225 | |
---|
226 | if (pt->dpdkpkt) { |
---|
227 | trace_destroy_packet(pt->dpdkpkt); |
---|
228 | } |
---|
229 | pt->dpdkpkt = NULL; |
---|
230 | |
---|
231 | if (pt->capstreams) { |
---|
232 | free(pt->capstreams); |
---|
233 | } |
---|
234 | |
---|
235 | for (i = 0; i < 40; i++) { |
---|
236 | if (pt->burstspace[i]) { |
---|
237 | rte_pktmbuf_free(pt->burstspace[i]); |
---|
238 | } |
---|
239 | } |
---|
240 | pthread_mutex_destroy(&(pt->ndag_lock)); |
---|
241 | } |
---|
242 | |
---|
243 | static int dpdkndag_pause_input(libtrace_t *libtrace) { |
---|
244 | |
---|
245 | int i; |
---|
246 | /* Pause DPDK receive */ |
---|
247 | dpdk_pause_input(FORMAT_DATA->dpdkrecv); |
---|
248 | |
---|
249 | /* Clear the threaddatas */ |
---|
250 | for (i = 0; i < libtrace->perpkt_thread_count; i++) { |
---|
251 | clear_threaddata(&(FORMAT_DATA->threaddatas[i])); |
---|
252 | } |
---|
253 | return 0; |
---|
254 | } |
---|
255 | |
---|
256 | static int dpdkndag_fin_input(libtrace_t *libtrace) { |
---|
257 | |
---|
258 | if (FORMAT_DATA->dpdkrecv) { |
---|
259 | trace_destroy(FORMAT_DATA->dpdkrecv); |
---|
260 | } |
---|
261 | |
---|
262 | if (FORMAT_DATA->threaddatas) { |
---|
263 | free(FORMAT_DATA->threaddatas); |
---|
264 | } |
---|
265 | |
---|
266 | if (FORMAT_DATA->localiface) { |
---|
267 | free(FORMAT_DATA->localiface); |
---|
268 | } |
---|
269 | |
---|
270 | if (FORMAT_DATA->multicastgroup) { |
---|
271 | freeaddrinfo(FORMAT_DATA->multicastgroup); |
---|
272 | } |
---|
273 | |
---|
274 | free(FORMAT_DATA); |
---|
275 | return 0; |
---|
276 | } |
---|
277 | |
---|
278 | static int dpdkndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, |
---|
279 | bool reader) { |
---|
280 | |
---|
281 | perthread_t *pt; |
---|
282 | |
---|
283 | if (!reader || t->type != THREAD_PERPKT) { |
---|
284 | return 0; |
---|
285 | } |
---|
286 | |
---|
287 | if (dpdk_pregister_thread(FORMAT_DATA->dpdkrecv, t, reader) == -1) { |
---|
288 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
289 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
290 | return -1; |
---|
291 | } |
---|
292 | |
---|
293 | /* t->format_data now contains our dpdk stream data */ |
---|
294 | pt = &(FORMAT_DATA->threaddatas[t->perpkt_num]); |
---|
295 | pt->dpdkstreamdata = t->format_data; |
---|
296 | t->format_data = pt; |
---|
297 | |
---|
298 | return 0; |
---|
299 | } |
---|
300 | |
---|
301 | static void dpdkndag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) { |
---|
302 | |
---|
303 | dpdk_punregister_thread(libtrace, t); |
---|
304 | } |
---|
305 | |
---|
306 | static void dpdkndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t, |
---|
307 | libtrace_stat_t *stat) { |
---|
308 | |
---|
309 | perthread_t *pt = (perthread_t *)t->format_data; |
---|
310 | |
---|
311 | if (libtrace == NULL) { |
---|
312 | return; |
---|
313 | } |
---|
314 | |
---|
315 | /* TODO Is this thread safe */ |
---|
316 | stat->dropped_valid = 1; |
---|
317 | stat->dropped = pt->dropped_upstream; |
---|
318 | |
---|
319 | stat->received_valid = 1; |
---|
320 | stat->received = pt->received_packets; |
---|
321 | |
---|
322 | stat->missing_valid = 1; |
---|
323 | stat->missing = pt->missing_records; |
---|
324 | } |
---|
325 | |
---|
326 | static void dpdkndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) { |
---|
327 | int i; |
---|
328 | |
---|
329 | libtrace_stat_t *dpdkstat; |
---|
330 | |
---|
331 | stat->dropped_valid = 1; |
---|
332 | stat->dropped = 0; |
---|
333 | stat->received_valid = 1; |
---|
334 | stat->received = 0; |
---|
335 | stat->missing_valid = 1; |
---|
336 | stat->missing = 0; |
---|
337 | |
---|
338 | dpdkstat = trace_create_statistics(); |
---|
339 | dpdk_get_stats(FORMAT_DATA->dpdkrecv, dpdkstat); |
---|
340 | |
---|
341 | if (dpdkstat->dropped_valid) { |
---|
342 | stat->errors_valid = 1; |
---|
343 | stat->errors = dpdkstat->dropped; |
---|
344 | } |
---|
345 | |
---|
346 | /* TODO Is this thread safe? */ |
---|
347 | for (i = 0; i < libtrace->perpkt_thread_count; i++) { |
---|
348 | pthread_mutex_lock(&(FORMAT_DATA->threaddatas[i].ndag_lock)); |
---|
349 | stat->dropped += FORMAT_DATA->threaddatas[i].dropped_upstream; |
---|
350 | stat->received += FORMAT_DATA->threaddatas[i].received_packets; |
---|
351 | stat->missing += FORMAT_DATA->threaddatas[i].missing_records; |
---|
352 | pthread_mutex_unlock(&(FORMAT_DATA->threaddatas[i].ndag_lock)); |
---|
353 | } |
---|
354 | free(dpdkstat); |
---|
355 | } |
---|
356 | |
---|
357 | static int is_ndag_packet(libtrace_packet_t *packet, perthread_t *pt) { |
---|
358 | |
---|
359 | void *trans = NULL; |
---|
360 | uint32_t rem = 0; |
---|
361 | uint8_t proto; |
---|
362 | char *payload; |
---|
363 | |
---|
364 | trans = trace_get_transport(packet, &proto, &rem); |
---|
365 | if (trans == NULL) { |
---|
366 | return 0; |
---|
367 | } |
---|
368 | |
---|
369 | if (proto != TRACE_IPPROTO_UDP) { |
---|
370 | return 0; |
---|
371 | } |
---|
372 | |
---|
373 | payload = (char *)trace_get_payload_from_udp((libtrace_udp_t *)trans, |
---|
374 | &rem); |
---|
375 | |
---|
376 | if (payload == NULL) { |
---|
377 | return 0; |
---|
378 | } |
---|
379 | |
---|
380 | if (rem < 4) { |
---|
381 | return 0; |
---|
382 | } |
---|
383 | |
---|
384 | if (payload[0] == 'N' && payload[1] == 'D' && payload[2] == 'A' |
---|
385 | && payload[3] == 'G') { |
---|
386 | pt->ndagsize = rem; |
---|
387 | pt->ndagheader = payload; |
---|
388 | return 1; |
---|
389 | } |
---|
390 | |
---|
391 | return 0; |
---|
392 | |
---|
393 | } |
---|
394 | |
---|
395 | static int sockaddr_same(struct sockaddr *a, struct sockaddr *b) { |
---|
396 | |
---|
397 | if (a->sa_family != b->sa_family) { |
---|
398 | return 0; |
---|
399 | } |
---|
400 | |
---|
401 | if (a->sa_family == AF_INET) { |
---|
402 | struct sockaddr_in *ain = (struct sockaddr_in *)a; |
---|
403 | struct sockaddr_in *bin = (struct sockaddr_in *)b; |
---|
404 | |
---|
405 | if (ain->sin_addr.s_addr != bin->sin_addr.s_addr) { |
---|
406 | return 0; |
---|
407 | } |
---|
408 | return 1; |
---|
409 | } else if (a->sa_family == AF_INET6) { |
---|
410 | struct sockaddr_in6 *ain6 = (struct sockaddr_in6 *)a; |
---|
411 | struct sockaddr_in6 *bin6 = (struct sockaddr_in6 *)b; |
---|
412 | |
---|
413 | if (memcmp(ain6->sin6_addr.s6_addr, bin6->sin6_addr.s6_addr, |
---|
414 | sizeof(ain6->sin6_addr.s6_addr)) != 0) { |
---|
415 | return 0; |
---|
416 | } |
---|
417 | return 1; |
---|
418 | } |
---|
419 | return 0; |
---|
420 | } |
---|
421 | |
---|
422 | static int process_fresh_packet(perthread_t *pt, struct addrinfo *expectedaddr) { |
---|
423 | |
---|
424 | ndag_common_t *header = (ndag_common_t *)pt->ndagheader; |
---|
425 | ndag_encap_t *encaphdr = (ndag_encap_t *)(pt->ndagheader + |
---|
426 | sizeof(ndag_common_t)); |
---|
427 | uint16_t targetport; |
---|
428 | struct sockaddr_storage targetaddr; |
---|
429 | struct sockaddr *p; |
---|
430 | capstream_t *cap = NULL; |
---|
431 | int i; |
---|
432 | |
---|
433 | memset((&targetaddr), 0, sizeof(targetaddr)); |
---|
434 | if (header->type != NDAG_PKT_ENCAPERF) { |
---|
435 | pt->nextrec = NULL; |
---|
436 | pt->ndagsize = 0; |
---|
437 | pt->ndagheader = NULL; |
---|
438 | return 1; |
---|
439 | } |
---|
440 | |
---|
441 | if ((p = trace_get_destination_address(pt->dpdkpkt, |
---|
442 | (struct sockaddr *)(&targetaddr))) == NULL) { |
---|
443 | pt->nextrec = NULL; |
---|
444 | pt->ndagsize = 0; |
---|
445 | pt->ndagheader = NULL; |
---|
446 | return 1; |
---|
447 | } |
---|
448 | |
---|
449 | if (!(sockaddr_same(p, expectedaddr->ai_addr))) { |
---|
450 | pt->nextrec = NULL; |
---|
451 | pt->ndagsize = 0; |
---|
452 | pt->ndagheader = NULL; |
---|
453 | return 1; |
---|
454 | } |
---|
455 | |
---|
456 | targetport = trace_get_destination_port(pt->dpdkpkt); |
---|
457 | if (pt->streamcount == 0) { |
---|
458 | pt->capstreams = (capstream_t *)malloc(sizeof(capstream_t)); |
---|
459 | pt->streamcount = 1; |
---|
460 | pt->capstreams[0].port = targetport; |
---|
461 | pt->capstreams[0].expectedseq = 0; |
---|
462 | pt->capstreams[0].recordcount = 0; |
---|
463 | cap = pt->capstreams; |
---|
464 | |
---|
465 | } else { |
---|
466 | for (i = 0; i < pt->streamcount; i++) { |
---|
467 | if (pt->capstreams[i].port == targetport) { |
---|
468 | cap = (&pt->capstreams[i]); |
---|
469 | break; |
---|
470 | } |
---|
471 | } |
---|
472 | |
---|
473 | if (cap == NULL) { |
---|
474 | uint16_t next = pt->streamcount; |
---|
475 | pt->capstreams = (capstream_t *)realloc(pt->capstreams, |
---|
476 | sizeof(capstream_t) * (pt->streamcount + 1)); |
---|
477 | pt->streamcount += 1; |
---|
478 | pt->capstreams[next].port = targetport; |
---|
479 | pt->capstreams[next].expectedseq = 0; |
---|
480 | pt->capstreams[next].recordcount = 0; |
---|
481 | cap = &(pt->capstreams[next]); |
---|
482 | } |
---|
483 | } |
---|
484 | if (cap->expectedseq != 0) { |
---|
485 | pthread_mutex_lock(&pt->ndag_lock); |
---|
486 | pt->missing_records += seq_cmp( |
---|
487 | ntohl(encaphdr->seqno), cap->expectedseq); |
---|
488 | pthread_mutex_unlock(&pt->ndag_lock); |
---|
489 | } |
---|
490 | cap->expectedseq = ntohl(encaphdr->seqno) + 1; |
---|
491 | if (cap->expectedseq == 0) { |
---|
492 | cap->expectedseq ++; |
---|
493 | } |
---|
494 | cap->recordcount ++; |
---|
495 | |
---|
496 | pt->nextrec = ((char *)header) + sizeof(ndag_common_t) + |
---|
497 | sizeof(ndag_encap_t); |
---|
498 | |
---|
499 | return 1; |
---|
500 | } |
---|
501 | |
---|
502 | static int ndagrec_to_libtrace_packet(libtrace_t *libtrace, perthread_t *pt, |
---|
503 | libtrace_packet_t *packet) { |
---|
504 | |
---|
505 | /* This is mostly borrowed from ndag_prepare_packet_stream, minus |
---|
506 | * the ndag socket-specific stuff */ |
---|
507 | |
---|
508 | dag_record_t *erfptr; |
---|
509 | ndag_encap_t *encaphdr; |
---|
510 | |
---|
511 | if (pt->nextrec == NULL) { |
---|
512 | return -1; |
---|
513 | } |
---|
514 | |
---|
515 | if (pt->nextrec - pt->ndagheader >= pt->ndagsize) { |
---|
516 | return -1; |
---|
517 | } |
---|
518 | |
---|
519 | packet->buf_control = TRACE_CTRL_EXTERNAL; |
---|
520 | |
---|
521 | packet->trace = libtrace; |
---|
522 | packet->buffer = pt->nextrec; |
---|
523 | packet->header = pt->nextrec; |
---|
524 | packet->type = TRACE_RT_DATA_ERF; |
---|
525 | |
---|
526 | erfptr = (dag_record_t *)packet->header; |
---|
527 | |
---|
528 | if (erfptr->flags.rxerror == 1) { |
---|
529 | packet->payload = NULL; |
---|
530 | erfptr->rlen = htons(erf_get_framing_length(packet)); |
---|
531 | } else { |
---|
532 | packet->payload = (char *)packet->buffer + |
---|
533 | erf_get_framing_length(packet); |
---|
534 | } |
---|
535 | |
---|
536 | /* Update upstream drops using lctr */ |
---|
537 | |
---|
538 | if (erfptr->type == TYPE_DSM_COLOR_ETH) { |
---|
539 | /* TODO */ |
---|
540 | } else { |
---|
541 | pthread_mutex_lock(&(pt->ndag_lock)); |
---|
542 | if (pt->received_packets > 0) { |
---|
543 | pt->dropped_upstream += ntohs(erfptr->lctr); |
---|
544 | } |
---|
545 | pthread_mutex_unlock(&(pt->ndag_lock)); |
---|
546 | } |
---|
547 | |
---|
548 | pthread_mutex_lock(&(pt->ndag_lock)); |
---|
549 | pt->received_packets ++; |
---|
550 | pthread_mutex_unlock(&(pt->ndag_lock)); |
---|
551 | encaphdr = (ndag_encap_t *)(pt->ndagheader + sizeof(ndag_common_t)); |
---|
552 | |
---|
553 | if ((ntohs(encaphdr->recordcount) & 0x8000) != 0) { |
---|
554 | /* Record was truncated */ |
---|
555 | erfptr->rlen = htons(pt->ndagsize - (pt->nextrec - |
---|
556 | pt->ndagheader)); |
---|
557 | } |
---|
558 | |
---|
559 | pt->nextrec += ntohs(erfptr->rlen); |
---|
560 | |
---|
561 | if (pt->nextrec - pt->ndagheader >= pt->ndagsize) { |
---|
562 | pt->ndagheader = NULL; |
---|
563 | pt->nextrec = NULL; |
---|
564 | pt->ndagsize = 0; |
---|
565 | } |
---|
566 | |
---|
567 | packet->order = erf_get_erf_timestamp(packet); |
---|
568 | packet->error = packet->payload ? ntohs(erfptr->rlen) : |
---|
569 | erf_get_framing_length(packet); |
---|
570 | return ntohs(erfptr->rlen); |
---|
571 | } |
---|
572 | |
---|
573 | static int dpdkndag_pread_packets(libtrace_t *libtrace, |
---|
574 | libtrace_thread_t *t, |
---|
575 | libtrace_packet_t **packets, |
---|
576 | size_t nb_packets) { |
---|
577 | |
---|
578 | perthread_t *pt = (perthread_t *)t->format_data; |
---|
579 | size_t read_packets = 0; |
---|
580 | int ret; |
---|
581 | |
---|
582 | while (pt->nextrec == NULL) { |
---|
583 | trace_fin_packet(pt->dpdkpkt); |
---|
584 | |
---|
585 | if (pt->burstsize > 0 && pt->burstsize != pt->burstoffset) { |
---|
586 | pt->dpdkpkt->buffer = pt->burstspace[pt->burstoffset]; |
---|
587 | pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv; |
---|
588 | dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt, |
---|
589 | pt->dpdkpkt->buffer, |
---|
590 | TRACE_RT_DATA_DPDK, 0); |
---|
591 | pt->burstoffset ++; |
---|
592 | } else { |
---|
593 | ret = dpdk_read_packet_stream(FORMAT_DATA->dpdkrecv, |
---|
594 | pt->dpdkstreamdata, |
---|
595 | &t->messages, |
---|
596 | pt->burstspace, |
---|
597 | 40); |
---|
598 | if (ret <= 0) { |
---|
599 | return ret; |
---|
600 | } |
---|
601 | |
---|
602 | pt->dpdkpkt->buffer = pt->burstspace[0]; |
---|
603 | pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv; |
---|
604 | dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt, |
---|
605 | pt->dpdkpkt->buffer, |
---|
606 | TRACE_RT_DATA_DPDK, 0); |
---|
607 | pt->burstsize = ret; |
---|
608 | pt->burstoffset = 1; |
---|
609 | } |
---|
610 | |
---|
611 | if (!is_ndag_packet(pt->dpdkpkt, pt)) { |
---|
612 | continue; |
---|
613 | } |
---|
614 | |
---|
615 | ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup); |
---|
616 | if (ret <= 0) { |
---|
617 | return ret; |
---|
618 | } |
---|
619 | } |
---|
620 | |
---|
621 | while (pt->nextrec != NULL) { |
---|
622 | if (read_packets == nb_packets) { |
---|
623 | break; |
---|
624 | } |
---|
625 | |
---|
626 | if (packets[read_packets]->buf_control == TRACE_CTRL_PACKET) { |
---|
627 | free(packets[read_packets]->buffer); |
---|
628 | packets[read_packets]->buffer = NULL; |
---|
629 | } |
---|
630 | ret = ndagrec_to_libtrace_packet(libtrace, pt, |
---|
631 | packets[read_packets]); |
---|
632 | if (ret < 0) { |
---|
633 | return ret; |
---|
634 | } |
---|
635 | read_packets ++; |
---|
636 | |
---|
637 | } |
---|
638 | |
---|
639 | return read_packets; |
---|
640 | } |
---|
641 | |
---|
642 | static int dpdkndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { |
---|
643 | |
---|
644 | perthread_t *pt = &(FORMAT_DATA->threaddatas[0]); |
---|
645 | int ret; |
---|
646 | |
---|
647 | if (packet->buf_control == TRACE_CTRL_PACKET) { |
---|
648 | free(packet->buffer); |
---|
649 | packet->buffer = NULL; |
---|
650 | } |
---|
651 | |
---|
652 | while (pt->nextrec == NULL) { |
---|
653 | trace_fin_packet(pt->dpdkpkt); |
---|
654 | |
---|
655 | ret = dpdk_read_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt); |
---|
656 | if (ret <= 0) { |
---|
657 | return ret; |
---|
658 | } |
---|
659 | |
---|
660 | if (!is_ndag_packet(pt->dpdkpkt, pt)) { |
---|
661 | continue; |
---|
662 | } |
---|
663 | |
---|
664 | ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup); |
---|
665 | if (ret <= 0) { |
---|
666 | return ret; |
---|
667 | } |
---|
668 | } |
---|
669 | |
---|
670 | return ndagrec_to_libtrace_packet(libtrace, pt, packet); |
---|
671 | } |
---|
672 | |
---|
673 | static libtrace_eventobj_t trace_event_dpdkndag(libtrace_t *libtrace, |
---|
674 | libtrace_packet_t *packet) { |
---|
675 | |
---|
676 | |
---|
677 | libtrace_eventobj_t event; |
---|
678 | int ret; |
---|
679 | perthread_t *pt = &(FORMAT_DATA->threaddatas[0]); |
---|
680 | |
---|
681 | if (packet->buf_control == TRACE_CTRL_PACKET) { |
---|
682 | free(packet->buffer); |
---|
683 | packet->buffer = NULL; |
---|
684 | } |
---|
685 | |
---|
686 | while (pt->nextrec == NULL) { |
---|
687 | |
---|
688 | event = dpdk_trace_event(libtrace, pt->dpdkpkt); |
---|
689 | |
---|
690 | if (event.type != TRACE_EVENT_PACKET) { |
---|
691 | return event; |
---|
692 | } |
---|
693 | |
---|
694 | if (!is_ndag_packet(pt->dpdkpkt, pt)) { |
---|
695 | continue; |
---|
696 | } |
---|
697 | |
---|
698 | ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup); |
---|
699 | if (ret <= 0) { |
---|
700 | event.type = TRACE_EVENT_TERMINATE; |
---|
701 | return event; |
---|
702 | } |
---|
703 | } |
---|
704 | |
---|
705 | ret = ndagrec_to_libtrace_packet(libtrace, pt, packet); |
---|
706 | if (ret < 0) { |
---|
707 | event.type = TRACE_EVENT_TERMINATE; |
---|
708 | } else { |
---|
709 | event.type = TRACE_EVENT_PACKET; |
---|
710 | event.size = 1; |
---|
711 | } |
---|
712 | return event; |
---|
713 | } |
---|
714 | |
---|
715 | static struct libtrace_format_t dpdkndag = { |
---|
716 | |
---|
717 | "dpdkndag", |
---|
718 | "", |
---|
719 | TRACE_FORMAT_DPDK_NDAG, |
---|
720 | NULL, /* probe filename */ |
---|
721 | NULL, /* probe magic */ |
---|
722 | dpdkndag_init_input, /* init_input */ |
---|
723 | dpdkndag_config_input, /* config_input */ |
---|
724 | dpdkndag_start_input, /* start_input */ |
---|
725 | dpdkndag_pause_input, /* pause_input */ |
---|
726 | NULL, /* init_output */ |
---|
727 | NULL, /* config_output */ |
---|
728 | NULL, /* start_output */ |
---|
729 | dpdkndag_fin_input, /* fin_input */ |
---|
730 | NULL, /* fin_output */ |
---|
731 | dpdkndag_read_packet, /* read_packet */ |
---|
732 | NULL, /* prepare_packet */ |
---|
733 | NULL, /* fin_packet */ |
---|
734 | NULL, /* write_packet */ |
---|
735 | erf_get_link_type, /* get_link_type */ |
---|
736 | erf_get_direction, /* get_direction */ |
---|
737 | erf_set_direction, /* set_direction */ |
---|
738 | erf_get_erf_timestamp, /* get_erf_timestamp */ |
---|
739 | NULL, /* get_timeval */ |
---|
740 | NULL, /* get_seconds */ |
---|
741 | NULL, /* get_timespec */ |
---|
742 | NULL, /* seek_erf */ |
---|
743 | NULL, /* seek_timeval */ |
---|
744 | NULL, /* seek_seconds */ |
---|
745 | erf_get_capture_length, /* get_capture_length */ |
---|
746 | erf_get_wire_length, /* get_wire_length */ |
---|
747 | erf_get_framing_length, /* get_framing_length */ |
---|
748 | erf_set_capture_length, /* set_capture_length */ |
---|
749 | NULL, /* get_received_packets */ |
---|
750 | NULL, /* get_filtered_packets */ |
---|
751 | NULL, /* get_dropped_packets */ |
---|
752 | dpdkndag_get_statistics, /* get_statistics */ |
---|
753 | NULL, /* get_fd */ |
---|
754 | trace_event_dpdkndag, /* trace_event */ |
---|
755 | NULL, /* help */ |
---|
756 | NULL, /* next pointer */ |
---|
757 | {true, 0}, /* live packet capture */ |
---|
758 | dpdkndag_pstart_input, /* parallel start */ |
---|
759 | dpdkndag_pread_packets, /* parallel read */ |
---|
760 | dpdkndag_pause_input, /* parallel pause */ |
---|
761 | NULL, |
---|
762 | dpdkndag_pregister_thread, /* register thread */ |
---|
763 | dpdkndag_punregister_thread, |
---|
764 | dpdkndag_get_thread_stats /* per-thread stats */ |
---|
765 | }; |
---|
766 | |
---|
767 | void dpdkndag_constructor(void) { |
---|
768 | register_format(&dpdkndag); |
---|
769 | } |
---|