1 | |
---|
2 | #include "config.h" |
---|
3 | #include "common.h" |
---|
4 | #include "libtrace.h" |
---|
5 | #include "libtrace_int.h" |
---|
6 | #include "format_helper.h" |
---|
7 | #include "format_erf.h" |
---|
8 | |
---|
9 | #include <errno.h> |
---|
10 | #include <fcntl.h> |
---|
11 | #include <stdio.h> |
---|
12 | #include <string.h> |
---|
13 | #include <unistd.h> |
---|
14 | #include <stdlib.h> |
---|
15 | #include <sys/types.h> |
---|
16 | #include <sys/socket.h> |
---|
17 | #include <netdb.h> |
---|
18 | |
---|
19 | #include "format_dpdk.h" |
---|
20 | #include "format_ndag.h" |
---|
21 | |
---|
22 | static struct libtrace_format_t dpdkndag; |
---|
23 | |
---|
24 | typedef struct capstream { |
---|
25 | |
---|
26 | uint16_t port; |
---|
27 | uint32_t expectedseq; |
---|
28 | uint64_t recordcount; |
---|
29 | } capstream_t; |
---|
30 | |
---|
31 | typedef struct perthread { |
---|
32 | capstream_t *capstreams; |
---|
33 | uint16_t streamcount; |
---|
34 | uint64_t dropped_upstream; |
---|
35 | uint64_t missing_records; |
---|
36 | uint64_t received_packets; |
---|
37 | |
---|
38 | libtrace_packet_t *dpdkpkt; |
---|
39 | char *ndagheader; |
---|
40 | char *nextrec; |
---|
41 | uint32_t ndagsize; |
---|
42 | |
---|
43 | pthread_mutex_t ndag_lock; |
---|
44 | dpdk_per_stream_t *dpdkstreamdata; |
---|
45 | int burstsize; |
---|
46 | int burstoffset; |
---|
47 | struct rte_mbuf* burstspace[40]; |
---|
48 | |
---|
49 | } perthread_t; |
---|
50 | |
---|
51 | |
---|
52 | typedef struct dpdkndag_format_data { |
---|
53 | libtrace_t *dpdkrecv; |
---|
54 | |
---|
55 | struct addrinfo *multicastgroup; |
---|
56 | char *localiface; |
---|
57 | |
---|
58 | perthread_t *threaddatas; |
---|
59 | |
---|
60 | } dpdkndag_format_data_t; |
---|
61 | |
---|
62 | #define FORMAT_DATA ((dpdkndag_format_data_t *)libtrace->format_data) |
---|
63 | |
---|
64 | static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) { |
---|
65 | |
---|
66 | /* Calculate seq_a - seq_b, taking wraparound into account */ |
---|
67 | if (seq_a == seq_b) return 0; |
---|
68 | |
---|
69 | if (seq_a > seq_b) { |
---|
70 | return (int) (seq_a - seq_b); |
---|
71 | } |
---|
72 | |
---|
73 | /* -1 for the wrap and another -1 because we don't use zero */ |
---|
74 | return (int) (0xffffffff - ((seq_b - seq_a) - 2)); |
---|
75 | } |
---|
76 | |
---|
77 | |
---|
78 | static int dpdkndag_init_input(libtrace_t *libtrace) { |
---|
79 | |
---|
80 | char *scan = NULL; |
---|
81 | char *next = NULL; |
---|
82 | char dpdkuri[1280]; |
---|
83 | struct addrinfo hints, *result; |
---|
84 | |
---|
85 | libtrace->format_data = (dpdkndag_format_data_t *)malloc( |
---|
86 | sizeof(dpdkndag_format_data_t)); |
---|
87 | |
---|
88 | if (!libtrace->format_data) { |
---|
89 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for " |
---|
90 | "format data inside dpdkndag_init_input()"); |
---|
91 | return -1; |
---|
92 | } |
---|
93 | |
---|
94 | FORMAT_DATA->localiface = NULL; |
---|
95 | FORMAT_DATA->threaddatas = NULL; |
---|
96 | FORMAT_DATA->dpdkrecv = NULL; |
---|
97 | |
---|
98 | scan = strchr(libtrace->uridata, ','); |
---|
99 | if (scan == NULL) { |
---|
100 | trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, |
---|
101 | "Bad dpdkndag URI. Should be dpdkndag:<interface>,<multicast group>"); |
---|
102 | return -1; |
---|
103 | } |
---|
104 | FORMAT_DATA->localiface = strndup(libtrace->uridata, |
---|
105 | (size_t)(scan - libtrace->uridata)); |
---|
106 | next = scan + 1; |
---|
107 | |
---|
108 | memset(&hints, 0, sizeof(struct addrinfo)); |
---|
109 | hints.ai_family = AF_UNSPEC; |
---|
110 | hints.ai_socktype = SOCK_DGRAM; |
---|
111 | hints.ai_flags = AI_PASSIVE; |
---|
112 | hints.ai_protocol = 0; |
---|
113 | |
---|
114 | if (getaddrinfo(next, NULL, &hints, &result) != 0) { |
---|
115 | perror("getaddrinfo"); |
---|
116 | trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, |
---|
117 | "Invalid multicast address: %s", next); |
---|
118 | return -1; |
---|
119 | } |
---|
120 | |
---|
121 | FORMAT_DATA->multicastgroup = result; |
---|
122 | |
---|
123 | snprintf(dpdkuri, 1279, "dpdk:%s", FORMAT_DATA->localiface); |
---|
124 | FORMAT_DATA->dpdkrecv = trace_create(dpdkuri); |
---|
125 | |
---|
126 | if (trace_is_err(FORMAT_DATA->dpdkrecv)) { |
---|
127 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
128 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
129 | free(libtrace->format_data); |
---|
130 | libtrace->format_data = NULL; |
---|
131 | return -1; |
---|
132 | } |
---|
133 | |
---|
134 | return 0; |
---|
135 | } |
---|
136 | |
---|
137 | static int dpdkndag_config_input (libtrace_t *libtrace, trace_option_t option, |
---|
138 | void *data) { |
---|
139 | |
---|
140 | return dpdk_config_input(FORMAT_DATA->dpdkrecv, option, data); |
---|
141 | } |
---|
142 | |
---|
143 | static int dpdkndag_init_threads(libtrace_t *libtrace, uint32_t maxthreads) { |
---|
144 | |
---|
145 | uint32_t i; |
---|
146 | if (FORMAT_DATA->threaddatas == NULL) { |
---|
147 | FORMAT_DATA->threaddatas = (perthread_t *)malloc( |
---|
148 | sizeof(perthread_t) * maxthreads); |
---|
149 | } |
---|
150 | |
---|
151 | for (i = 0; i < maxthreads; i++) { |
---|
152 | FORMAT_DATA->threaddatas[i].capstreams = NULL; |
---|
153 | FORMAT_DATA->threaddatas[i].streamcount = 0; |
---|
154 | FORMAT_DATA->threaddatas[i].dropped_upstream = 0; |
---|
155 | FORMAT_DATA->threaddatas[i].received_packets = 0; |
---|
156 | FORMAT_DATA->threaddatas[i].missing_records = 0; |
---|
157 | FORMAT_DATA->threaddatas[i].dpdkstreamdata = NULL; |
---|
158 | FORMAT_DATA->threaddatas[i].dpdkpkt = trace_create_packet(); |
---|
159 | FORMAT_DATA->threaddatas[i].ndagheader = NULL; |
---|
160 | FORMAT_DATA->threaddatas[i].nextrec = NULL; |
---|
161 | FORMAT_DATA->threaddatas[i].burstsize = 0; |
---|
162 | FORMAT_DATA->threaddatas[i].burstoffset = 0; |
---|
163 | memset(FORMAT_DATA->threaddatas[i].burstspace, 0, |
---|
164 | sizeof(struct rte_mbuf *) * 40); |
---|
165 | pthread_mutex_init(&(FORMAT_DATA->threaddatas[i].ndag_lock), |
---|
166 | NULL); |
---|
167 | } |
---|
168 | return maxthreads; |
---|
169 | } |
---|
170 | |
---|
171 | static int dpdkndag_start_input(libtrace_t *libtrace) { |
---|
172 | enum hasher_types hash = HASHER_UNIDIRECTIONAL; |
---|
173 | int snaplen = 9000; |
---|
174 | |
---|
175 | if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER, |
---|
176 | &hash) == -1) { |
---|
177 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
178 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
179 | return -1; |
---|
180 | } |
---|
181 | |
---|
182 | if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN, |
---|
183 | &snaplen) == -1) { |
---|
184 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
185 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
186 | return -1; |
---|
187 | } |
---|
188 | |
---|
189 | if (dpdk_start_input(FORMAT_DATA->dpdkrecv) == -1) { |
---|
190 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
191 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
192 | return -1; |
---|
193 | } |
---|
194 | |
---|
195 | dpdkndag_init_threads(libtrace, 1); |
---|
196 | |
---|
197 | return 0; |
---|
198 | } |
---|
199 | |
---|
200 | static int dpdkndag_pstart_input(libtrace_t *libtrace) { |
---|
201 | |
---|
202 | enum hasher_types hash = HASHER_UNIDIRECTIONAL; |
---|
203 | int snaplen = 9000; |
---|
204 | FORMAT_DATA->dpdkrecv->perpkt_thread_count = libtrace->perpkt_thread_count; |
---|
205 | if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER, |
---|
206 | &hash) == -1) { |
---|
207 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
208 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
209 | return -1; |
---|
210 | } |
---|
211 | |
---|
212 | if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN, |
---|
213 | &snaplen) == -1) { |
---|
214 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
215 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
216 | return -1; |
---|
217 | } |
---|
218 | if (dpdk_pstart_input(FORMAT_DATA->dpdkrecv) == -1) { |
---|
219 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
220 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
221 | return -1; |
---|
222 | } |
---|
223 | dpdkndag_init_threads(libtrace, libtrace->perpkt_thread_count); |
---|
224 | return 0; |
---|
225 | } |
---|
226 | |
---|
227 | static void clear_threaddata(perthread_t *pt) { |
---|
228 | |
---|
229 | int i; |
---|
230 | |
---|
231 | if (pt->dpdkpkt) { |
---|
232 | trace_destroy_packet(pt->dpdkpkt); |
---|
233 | } |
---|
234 | pt->dpdkpkt = NULL; |
---|
235 | |
---|
236 | if (pt->capstreams) { |
---|
237 | free(pt->capstreams); |
---|
238 | } |
---|
239 | |
---|
240 | for (i = 0; i < 40; i++) { |
---|
241 | if (pt->burstspace[i]) { |
---|
242 | rte_pktmbuf_free(pt->burstspace[i]); |
---|
243 | } |
---|
244 | } |
---|
245 | pthread_mutex_destroy(&(pt->ndag_lock)); |
---|
246 | } |
---|
247 | |
---|
248 | static int dpdkndag_pause_input(libtrace_t *libtrace) { |
---|
249 | |
---|
250 | int i; |
---|
251 | /* Pause DPDK receive */ |
---|
252 | dpdk_pause_input(FORMAT_DATA->dpdkrecv); |
---|
253 | |
---|
254 | /* Clear the threaddatas */ |
---|
255 | for (i = 0; i < libtrace->perpkt_thread_count; i++) { |
---|
256 | clear_threaddata(&(FORMAT_DATA->threaddatas[i])); |
---|
257 | } |
---|
258 | return 0; |
---|
259 | } |
---|
260 | |
---|
261 | static int dpdkndag_fin_input(libtrace_t *libtrace) { |
---|
262 | |
---|
263 | if (FORMAT_DATA->dpdkrecv) { |
---|
264 | trace_destroy(FORMAT_DATA->dpdkrecv); |
---|
265 | } |
---|
266 | |
---|
267 | if (FORMAT_DATA->threaddatas) { |
---|
268 | free(FORMAT_DATA->threaddatas); |
---|
269 | } |
---|
270 | |
---|
271 | if (FORMAT_DATA->localiface) { |
---|
272 | free(FORMAT_DATA->localiface); |
---|
273 | } |
---|
274 | |
---|
275 | if (FORMAT_DATA->multicastgroup) { |
---|
276 | freeaddrinfo(FORMAT_DATA->multicastgroup); |
---|
277 | } |
---|
278 | |
---|
279 | free(FORMAT_DATA); |
---|
280 | return 0; |
---|
281 | } |
---|
282 | |
---|
283 | static int dpdkndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, |
---|
284 | bool reader) { |
---|
285 | |
---|
286 | perthread_t *pt; |
---|
287 | |
---|
288 | if (!reader || t->type != THREAD_PERPKT) { |
---|
289 | return 0; |
---|
290 | } |
---|
291 | |
---|
292 | if (dpdk_pregister_thread(FORMAT_DATA->dpdkrecv, t, reader) == -1) { |
---|
293 | libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); |
---|
294 | trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); |
---|
295 | return -1; |
---|
296 | } |
---|
297 | |
---|
298 | /* t->format_data now contains our dpdk stream data */ |
---|
299 | pt = &(FORMAT_DATA->threaddatas[t->perpkt_num]); |
---|
300 | pt->dpdkstreamdata = t->format_data; |
---|
301 | t->format_data = pt; |
---|
302 | |
---|
303 | return 0; |
---|
304 | } |
---|
305 | |
---|
306 | static void dpdkndag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) { |
---|
307 | |
---|
308 | dpdk_punregister_thread(libtrace, t); |
---|
309 | } |
---|
310 | |
---|
311 | static void dpdkndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t, |
---|
312 | libtrace_stat_t *stat) { |
---|
313 | |
---|
314 | perthread_t *pt = (perthread_t *)t->format_data; |
---|
315 | |
---|
316 | if (libtrace == NULL) { |
---|
317 | return; |
---|
318 | } |
---|
319 | |
---|
320 | /* TODO Is this thread safe */ |
---|
321 | stat->dropped_valid = 1; |
---|
322 | stat->dropped = pt->dropped_upstream; |
---|
323 | |
---|
324 | stat->received_valid = 1; |
---|
325 | stat->received = pt->received_packets; |
---|
326 | |
---|
327 | stat->missing_valid = 1; |
---|
328 | stat->missing = pt->missing_records; |
---|
329 | } |
---|
330 | |
---|
331 | static void dpdkndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) { |
---|
332 | int i; |
---|
333 | |
---|
334 | libtrace_stat_t *dpdkstat; |
---|
335 | |
---|
336 | stat->dropped_valid = 1; |
---|
337 | stat->dropped = 0; |
---|
338 | stat->received_valid = 1; |
---|
339 | stat->received = 0; |
---|
340 | stat->missing_valid = 1; |
---|
341 | stat->missing = 0; |
---|
342 | |
---|
343 | dpdkstat = trace_create_statistics(); |
---|
344 | dpdk_get_stats(FORMAT_DATA->dpdkrecv, dpdkstat); |
---|
345 | |
---|
346 | if (dpdkstat->dropped_valid) { |
---|
347 | stat->errors_valid = 1; |
---|
348 | stat->errors = dpdkstat->dropped; |
---|
349 | } |
---|
350 | |
---|
351 | /* TODO Is this thread safe? */ |
---|
352 | for (i = 0; i < libtrace->perpkt_thread_count; i++) { |
---|
353 | pthread_mutex_lock(&(FORMAT_DATA->threaddatas[i].ndag_lock)); |
---|
354 | stat->dropped += FORMAT_DATA->threaddatas[i].dropped_upstream; |
---|
355 | stat->received += FORMAT_DATA->threaddatas[i].received_packets; |
---|
356 | stat->missing += FORMAT_DATA->threaddatas[i].missing_records; |
---|
357 | pthread_mutex_unlock(&(FORMAT_DATA->threaddatas[i].ndag_lock)); |
---|
358 | } |
---|
359 | free(dpdkstat); |
---|
360 | } |
---|
361 | |
---|
362 | static int is_ndag_packet(libtrace_packet_t *packet, perthread_t *pt) { |
---|
363 | |
---|
364 | void *trans = NULL; |
---|
365 | uint32_t rem = 0; |
---|
366 | uint8_t proto; |
---|
367 | char *payload; |
---|
368 | |
---|
369 | trans = trace_get_transport(packet, &proto, &rem); |
---|
370 | if (trans == NULL) { |
---|
371 | return 0; |
---|
372 | } |
---|
373 | |
---|
374 | if (proto != TRACE_IPPROTO_UDP) { |
---|
375 | return 0; |
---|
376 | } |
---|
377 | |
---|
378 | payload = (char *)trace_get_payload_from_udp((libtrace_udp_t *)trans, |
---|
379 | &rem); |
---|
380 | |
---|
381 | if (payload == NULL) { |
---|
382 | return 0; |
---|
383 | } |
---|
384 | |
---|
385 | if (rem < 4) { |
---|
386 | return 0; |
---|
387 | } |
---|
388 | |
---|
389 | if (payload[0] == 'N' && payload[1] == 'D' && payload[2] == 'A' |
---|
390 | && payload[3] == 'G') { |
---|
391 | pt->ndagsize = rem; |
---|
392 | pt->ndagheader = payload; |
---|
393 | return 1; |
---|
394 | } |
---|
395 | |
---|
396 | return 0; |
---|
397 | |
---|
398 | } |
---|
399 | |
---|
400 | static int sockaddr_same(struct sockaddr *a, struct sockaddr *b) { |
---|
401 | |
---|
402 | if (a->sa_family != b->sa_family) { |
---|
403 | return 0; |
---|
404 | } |
---|
405 | |
---|
406 | if (a->sa_family == AF_INET) { |
---|
407 | struct sockaddr_in *ain = (struct sockaddr_in *)a; |
---|
408 | struct sockaddr_in *bin = (struct sockaddr_in *)b; |
---|
409 | |
---|
410 | if (ain->sin_addr.s_addr != bin->sin_addr.s_addr) { |
---|
411 | return 0; |
---|
412 | } |
---|
413 | return 1; |
---|
414 | } else if (a->sa_family == AF_INET6) { |
---|
415 | struct sockaddr_in6 *ain6 = (struct sockaddr_in6 *)a; |
---|
416 | struct sockaddr_in6 *bin6 = (struct sockaddr_in6 *)b; |
---|
417 | |
---|
418 | if (memcmp(ain6->sin6_addr.s6_addr, bin6->sin6_addr.s6_addr, |
---|
419 | sizeof(ain6->sin6_addr.s6_addr)) != 0) { |
---|
420 | return 0; |
---|
421 | } |
---|
422 | return 1; |
---|
423 | } |
---|
424 | return 0; |
---|
425 | } |
---|
426 | |
---|
427 | static int process_fresh_packet(perthread_t *pt, struct addrinfo *expectedaddr) { |
---|
428 | |
---|
429 | ndag_common_t *header = (ndag_common_t *)pt->ndagheader; |
---|
430 | ndag_encap_t *encaphdr = (ndag_encap_t *)(pt->ndagheader + |
---|
431 | sizeof(ndag_common_t)); |
---|
432 | uint16_t targetport; |
---|
433 | struct sockaddr_storage targetaddr; |
---|
434 | struct sockaddr *p; |
---|
435 | capstream_t *cap = NULL; |
---|
436 | int i; |
---|
437 | |
---|
438 | memset((&targetaddr), 0, sizeof(targetaddr)); |
---|
439 | if (header->type != NDAG_PKT_ENCAPERF) { |
---|
440 | pt->nextrec = NULL; |
---|
441 | pt->ndagsize = 0; |
---|
442 | pt->ndagheader = NULL; |
---|
443 | return 1; |
---|
444 | } |
---|
445 | |
---|
446 | if ((p = trace_get_destination_address(pt->dpdkpkt, |
---|
447 | (struct sockaddr *)(&targetaddr))) == NULL) { |
---|
448 | pt->nextrec = NULL; |
---|
449 | pt->ndagsize = 0; |
---|
450 | pt->ndagheader = NULL; |
---|
451 | return 1; |
---|
452 | } |
---|
453 | |
---|
454 | if (!(sockaddr_same(p, expectedaddr->ai_addr))) { |
---|
455 | pt->nextrec = NULL; |
---|
456 | pt->ndagsize = 0; |
---|
457 | pt->ndagheader = NULL; |
---|
458 | return 1; |
---|
459 | } |
---|
460 | |
---|
461 | targetport = trace_get_destination_port(pt->dpdkpkt); |
---|
462 | if (pt->streamcount == 0) { |
---|
463 | pt->capstreams = (capstream_t *)malloc(sizeof(capstream_t)); |
---|
464 | pt->streamcount = 1; |
---|
465 | pt->capstreams[0].port = targetport; |
---|
466 | pt->capstreams[0].expectedseq = 0; |
---|
467 | pt->capstreams[0].recordcount = 0; |
---|
468 | cap = pt->capstreams; |
---|
469 | |
---|
470 | } else { |
---|
471 | for (i = 0; i < pt->streamcount; i++) { |
---|
472 | if (pt->capstreams[i].port == targetport) { |
---|
473 | cap = (&pt->capstreams[i]); |
---|
474 | break; |
---|
475 | } |
---|
476 | } |
---|
477 | |
---|
478 | if (cap == NULL) { |
---|
479 | uint16_t next = pt->streamcount; |
---|
480 | pt->capstreams = (capstream_t *)realloc(pt->capstreams, |
---|
481 | sizeof(capstream_t) * (pt->streamcount + 1)); |
---|
482 | pt->streamcount += 1; |
---|
483 | pt->capstreams[next].port = targetport; |
---|
484 | pt->capstreams[next].expectedseq = 0; |
---|
485 | pt->capstreams[next].recordcount = 0; |
---|
486 | cap = &(pt->capstreams[next]); |
---|
487 | } |
---|
488 | } |
---|
489 | if (cap->expectedseq != 0) { |
---|
490 | pthread_mutex_lock(&pt->ndag_lock); |
---|
491 | pt->missing_records += seq_cmp( |
---|
492 | ntohl(encaphdr->seqno), cap->expectedseq); |
---|
493 | pthread_mutex_unlock(&pt->ndag_lock); |
---|
494 | } |
---|
495 | cap->expectedseq = ntohl(encaphdr->seqno) + 1; |
---|
496 | if (cap->expectedseq == 0) { |
---|
497 | cap->expectedseq ++; |
---|
498 | } |
---|
499 | cap->recordcount ++; |
---|
500 | |
---|
501 | pt->nextrec = ((char *)header) + sizeof(ndag_common_t) + |
---|
502 | sizeof(ndag_encap_t); |
---|
503 | |
---|
504 | return 1; |
---|
505 | } |
---|
506 | |
---|
507 | static int ndagrec_to_libtrace_packet(libtrace_t *libtrace, perthread_t *pt, |
---|
508 | libtrace_packet_t *packet) { |
---|
509 | |
---|
510 | /* This is mostly borrowed from ndag_prepare_packet_stream, minus |
---|
511 | * the ndag socket-specific stuff */ |
---|
512 | |
---|
513 | dag_record_t *erfptr; |
---|
514 | ndag_encap_t *encaphdr; |
---|
515 | |
---|
516 | if (pt->nextrec == NULL) { |
---|
517 | return -1; |
---|
518 | } |
---|
519 | |
---|
520 | if (pt->nextrec - pt->ndagheader >= pt->ndagsize) { |
---|
521 | return -1; |
---|
522 | } |
---|
523 | |
---|
524 | packet->buf_control = TRACE_CTRL_EXTERNAL; |
---|
525 | |
---|
526 | packet->trace = libtrace; |
---|
527 | packet->buffer = pt->nextrec; |
---|
528 | packet->header = pt->nextrec; |
---|
529 | packet->type = TRACE_RT_DATA_ERF; |
---|
530 | |
---|
531 | erfptr = (dag_record_t *)packet->header; |
---|
532 | |
---|
533 | if (erfptr->flags.rxerror == 1) { |
---|
534 | packet->payload = NULL; |
---|
535 | erfptr->rlen = htons(erf_get_framing_length(packet)); |
---|
536 | } else { |
---|
537 | packet->payload = (char *)packet->buffer + |
---|
538 | erf_get_framing_length(packet); |
---|
539 | } |
---|
540 | |
---|
541 | /* Update upstream drops using lctr */ |
---|
542 | |
---|
543 | if (erfptr->type == TYPE_DSM_COLOR_ETH) { |
---|
544 | /* TODO */ |
---|
545 | } else { |
---|
546 | pthread_mutex_lock(&(pt->ndag_lock)); |
---|
547 | if (pt->received_packets > 0) { |
---|
548 | pt->dropped_upstream += ntohs(erfptr->lctr); |
---|
549 | } |
---|
550 | pthread_mutex_unlock(&(pt->ndag_lock)); |
---|
551 | } |
---|
552 | |
---|
553 | pthread_mutex_lock(&(pt->ndag_lock)); |
---|
554 | pt->received_packets ++; |
---|
555 | pthread_mutex_unlock(&(pt->ndag_lock)); |
---|
556 | encaphdr = (ndag_encap_t *)(pt->ndagheader + sizeof(ndag_common_t)); |
---|
557 | |
---|
558 | if ((ntohs(encaphdr->recordcount) & 0x8000) != 0) { |
---|
559 | /* Record was truncated */ |
---|
560 | erfptr->rlen = htons(pt->ndagsize - (pt->nextrec - |
---|
561 | pt->ndagheader)); |
---|
562 | } |
---|
563 | |
---|
564 | pt->nextrec += ntohs(erfptr->rlen); |
---|
565 | |
---|
566 | if (pt->nextrec - pt->ndagheader >= pt->ndagsize) { |
---|
567 | pt->ndagheader = NULL; |
---|
568 | pt->nextrec = NULL; |
---|
569 | pt->ndagsize = 0; |
---|
570 | } |
---|
571 | |
---|
572 | packet->order = erf_get_erf_timestamp(packet); |
---|
573 | packet->error = packet->payload ? ntohs(erfptr->rlen) : |
---|
574 | erf_get_framing_length(packet); |
---|
575 | return ntohs(erfptr->rlen); |
---|
576 | } |
---|
577 | |
---|
578 | static int dpdkndag_pread_packets(libtrace_t *libtrace, |
---|
579 | libtrace_thread_t *t, |
---|
580 | libtrace_packet_t **packets, |
---|
581 | size_t nb_packets) { |
---|
582 | |
---|
583 | perthread_t *pt = (perthread_t *)t->format_data; |
---|
584 | size_t read_packets = 0; |
---|
585 | int ret; |
---|
586 | |
---|
587 | while (pt->nextrec == NULL) { |
---|
588 | trace_fin_packet(pt->dpdkpkt); |
---|
589 | |
---|
590 | if (pt->burstsize > 0 && pt->burstsize != pt->burstoffset) { |
---|
591 | pt->dpdkpkt->buffer = pt->burstspace[pt->burstoffset]; |
---|
592 | pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv; |
---|
593 | dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt, |
---|
594 | pt->dpdkpkt->buffer, |
---|
595 | TRACE_RT_DATA_DPDK, 0); |
---|
596 | pt->burstoffset ++; |
---|
597 | } else { |
---|
598 | ret = dpdk_read_packet_stream(FORMAT_DATA->dpdkrecv, |
---|
599 | pt->dpdkstreamdata, |
---|
600 | &t->messages, |
---|
601 | pt->burstspace, |
---|
602 | 40); |
---|
603 | if (ret <= 0) { |
---|
604 | return ret; |
---|
605 | } |
---|
606 | |
---|
607 | pt->dpdkpkt->buffer = pt->burstspace[0]; |
---|
608 | pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv; |
---|
609 | dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt, |
---|
610 | pt->dpdkpkt->buffer, |
---|
611 | TRACE_RT_DATA_DPDK, 0); |
---|
612 | pt->burstsize = ret; |
---|
613 | pt->burstoffset = 1; |
---|
614 | } |
---|
615 | |
---|
616 | if (!is_ndag_packet(pt->dpdkpkt, pt)) { |
---|
617 | continue; |
---|
618 | } |
---|
619 | |
---|
620 | ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup); |
---|
621 | if (ret <= 0) { |
---|
622 | return ret; |
---|
623 | } |
---|
624 | } |
---|
625 | |
---|
626 | while (pt->nextrec != NULL) { |
---|
627 | if (read_packets == nb_packets) { |
---|
628 | break; |
---|
629 | } |
---|
630 | |
---|
631 | if (packets[read_packets]->buf_control == TRACE_CTRL_PACKET) { |
---|
632 | free(packets[read_packets]->buffer); |
---|
633 | packets[read_packets]->buffer = NULL; |
---|
634 | } |
---|
635 | ret = ndagrec_to_libtrace_packet(libtrace, pt, |
---|
636 | packets[read_packets]); |
---|
637 | if (ret < 0) { |
---|
638 | return ret; |
---|
639 | } |
---|
640 | read_packets ++; |
---|
641 | |
---|
642 | } |
---|
643 | |
---|
644 | return read_packets; |
---|
645 | } |
---|
646 | |
---|
647 | static int dpdkndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { |
---|
648 | |
---|
649 | perthread_t *pt = &(FORMAT_DATA->threaddatas[0]); |
---|
650 | int ret; |
---|
651 | |
---|
652 | if (packet->buf_control == TRACE_CTRL_PACKET) { |
---|
653 | free(packet->buffer); |
---|
654 | packet->buffer = NULL; |
---|
655 | } |
---|
656 | |
---|
657 | while (pt->nextrec == NULL) { |
---|
658 | trace_fin_packet(pt->dpdkpkt); |
---|
659 | |
---|
660 | ret = dpdk_read_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt); |
---|
661 | if (ret <= 0) { |
---|
662 | return ret; |
---|
663 | } |
---|
664 | |
---|
665 | if (!is_ndag_packet(pt->dpdkpkt, pt)) { |
---|
666 | continue; |
---|
667 | } |
---|
668 | |
---|
669 | ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup); |
---|
670 | if (ret <= 0) { |
---|
671 | return ret; |
---|
672 | } |
---|
673 | } |
---|
674 | |
---|
675 | return ndagrec_to_libtrace_packet(libtrace, pt, packet); |
---|
676 | } |
---|
677 | |
---|
678 | static libtrace_eventobj_t trace_event_dpdkndag(libtrace_t *libtrace, |
---|
679 | libtrace_packet_t *packet) { |
---|
680 | |
---|
681 | |
---|
682 | libtrace_eventobj_t event; |
---|
683 | int ret; |
---|
684 | perthread_t *pt = &(FORMAT_DATA->threaddatas[0]); |
---|
685 | |
---|
686 | if (packet->buf_control == TRACE_CTRL_PACKET) { |
---|
687 | free(packet->buffer); |
---|
688 | packet->buffer = NULL; |
---|
689 | } |
---|
690 | |
---|
691 | while (pt->nextrec == NULL) { |
---|
692 | |
---|
693 | event = dpdk_trace_event(libtrace, pt->dpdkpkt); |
---|
694 | |
---|
695 | if (event.type != TRACE_EVENT_PACKET) { |
---|
696 | return event; |
---|
697 | } |
---|
698 | |
---|
699 | if (!is_ndag_packet(pt->dpdkpkt, pt)) { |
---|
700 | continue; |
---|
701 | } |
---|
702 | |
---|
703 | ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup); |
---|
704 | if (ret <= 0) { |
---|
705 | event.type = TRACE_EVENT_TERMINATE; |
---|
706 | return event; |
---|
707 | } |
---|
708 | } |
---|
709 | |
---|
710 | ret = ndagrec_to_libtrace_packet(libtrace, pt, packet); |
---|
711 | if (ret < 0) { |
---|
712 | event.type = TRACE_EVENT_TERMINATE; |
---|
713 | } else { |
---|
714 | event.type = TRACE_EVENT_PACKET; |
---|
715 | event.size = 1; |
---|
716 | } |
---|
717 | return event; |
---|
718 | } |
---|
719 | |
---|
720 | static struct libtrace_format_t dpdkndag = { |
---|
721 | |
---|
722 | "dpdkndag", |
---|
723 | "", |
---|
724 | TRACE_FORMAT_DPDK_NDAG, |
---|
725 | NULL, /* probe filename */ |
---|
726 | NULL, /* probe magic */ |
---|
727 | dpdkndag_init_input, /* init_input */ |
---|
728 | dpdkndag_config_input, /* config_input */ |
---|
729 | dpdkndag_start_input, /* start_input */ |
---|
730 | dpdkndag_pause_input, /* pause_input */ |
---|
731 | NULL, /* init_output */ |
---|
732 | NULL, /* config_output */ |
---|
733 | NULL, /* start_output */ |
---|
734 | dpdkndag_fin_input, /* fin_input */ |
---|
735 | NULL, /* fin_output */ |
---|
736 | dpdkndag_read_packet, /* read_packet */ |
---|
737 | NULL, /* prepare_packet */ |
---|
738 | NULL, /* fin_packet */ |
---|
739 | NULL, /* write_packet */ |
---|
740 | NULL, /* flush_output */ |
---|
741 | erf_get_link_type, /* get_link_type */ |
---|
742 | erf_get_direction, /* get_direction */ |
---|
743 | erf_set_direction, /* set_direction */ |
---|
744 | erf_get_erf_timestamp, /* get_erf_timestamp */ |
---|
745 | NULL, /* get_timeval */ |
---|
746 | NULL, /* get_seconds */ |
---|
747 | NULL, /* get_timespec */ |
---|
748 | NULL, /* seek_erf */ |
---|
749 | NULL, /* seek_timeval */ |
---|
750 | NULL, /* seek_seconds */ |
---|
751 | erf_get_capture_length, /* get_capture_length */ |
---|
752 | erf_get_wire_length, /* get_wire_length */ |
---|
753 | erf_get_framing_length, /* get_framing_length */ |
---|
754 | erf_set_capture_length, /* set_capture_length */ |
---|
755 | NULL, /* get_received_packets */ |
---|
756 | NULL, /* get_filtered_packets */ |
---|
757 | NULL, /* get_dropped_packets */ |
---|
758 | dpdkndag_get_statistics, /* get_statistics */ |
---|
759 | NULL, /* get_fd */ |
---|
760 | trace_event_dpdkndag, /* trace_event */ |
---|
761 | NULL, /* help */ |
---|
762 | NULL, /* next pointer */ |
---|
763 | {true, 0}, /* live packet capture */ |
---|
764 | dpdkndag_pstart_input, /* parallel start */ |
---|
765 | dpdkndag_pread_packets, /* parallel read */ |
---|
766 | dpdkndag_pause_input, /* parallel pause */ |
---|
767 | NULL, |
---|
768 | dpdkndag_pregister_thread, /* register thread */ |
---|
769 | dpdkndag_punregister_thread, |
---|
770 | dpdkndag_get_thread_stats /* per-thread stats */ |
---|
771 | }; |
---|
772 | |
---|
773 | void dpdkndag_constructor(void) { |
---|
774 | register_format(&dpdkndag); |
---|
775 | } |
---|