source: lib/format_dag25.c @ 5778738

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5778738 was 5778738, checked in by Shane Alcock <salcock@…>, 11 years ago
  • Fixed major bug where we were sleeping whenever a packet did not match a specified BPF filter when reading from a DAG card, rather than immediately checking if there was another suitable packet available. This was causing libtrace to get massively behind in reading packets from the DAG card so the DAG buffer would overflow and packets would be dropped.
  • NOTE: the dag2.4 version of this fix has not been fully tested (or even compiled) because we upgraded the box that used to have the old DAG cards in
  • Property mode set to 100644
File size: 27.6 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31#define _GNU_SOURCE
32
33#include "config.h"
34#include "common.h"
35#include "libtrace.h"
36#include "libtrace_int.h"
37#include "format_helper.h"
38#include "format_erf.h"
39
40#include <assert.h>
41#include <errno.h>
42#include <fcntl.h>
43#include <stdio.h>
44#include <string.h>
45#include <stdlib.h>
46
47#include <sys/mman.h>
48/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
49 * Windows anyway so we'll worry about this more later :] */
50#include <pthread.h>
51
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66
67#define DATA(x) ((struct dag_format_data_t *)x->format_data)
68#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
69
70#define FORMAT_DATA DATA(libtrace)
71#define FORMAT_DATA_OUT DATA_OUT(libtrace)
72
73#define DUCK FORMAT_DATA->duck
74static struct libtrace_format_t dag;
75
76struct dag_dev_t {
77        //pthread_mutex_t dag_mutex;
78        char * dev_name;
79        int fd;
80        uint16_t ref_count;
81        struct dag_dev_t *prev;
82        struct dag_dev_t *next;
83};
84
85struct dag_format_data_out_t {
86        struct dag_dev_t *device;
87        unsigned int dagstream;
88        int stream_attached;
89        uint8_t *bottom;
90        uint8_t *top;
91        uint32_t processed;
92        uint64_t drops;
93        uint64_t waiting;
94        uint8_t *txbuffer;
95};
96
97struct dag_format_data_t {
98        struct {
99                uint32_t last_duck;
100                uint32_t duck_freq;
101                uint32_t last_pkt;
102                libtrace_t *dummy_duck;
103        } duck;
104
105        struct dag_dev_t *device;
106        unsigned int dagstream;
107        int stream_attached;
108        uint8_t *bottom;
109        uint8_t *top;
110        uint32_t processed;
111        uint64_t drops;
112};
113
114pthread_mutex_t open_dag_mutex;
115struct dag_dev_t *open_dags = NULL;
116
117/* Dag erf ether packets have a 2 byte padding before the packet
118 * so that the ip header is aligned on a 32 bit boundary.
119 */
120static int dag_get_padding(const libtrace_packet_t *packet)
121{
122        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
123                dag_record_t *erfptr = (dag_record_t *)packet->header;
124                switch(erfptr->type) {
125                        case TYPE_ETH:
126                        case TYPE_DSM_COLOR_ETH:
127                                return 2;
128                        default:                return 0;
129                }
130        }
131        else {
132                switch(trace_get_link_type(packet)) {
133                        case TRACE_TYPE_ETH:    return 2;
134                        default:                return 0;
135                }
136        }
137}
138
139static int dag_probe_filename(const char *filename)
140{
141        struct stat statbuf;
142        /* Can we stat the file? */
143        if (stat(filename, &statbuf) != 0) {
144                return 0;
145        }
146        /* Is it a character device? */
147        if (!S_ISCHR(statbuf.st_mode)) {
148                return 0;
149        }
150        /* Yeah, it's probably us. */
151        return 1;
152}
153
154static void dag_init_format_out_data(libtrace_out_t *libtrace) {
155        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
156        // no DUCK on output
157        FORMAT_DATA_OUT->stream_attached = 0;
158        FORMAT_DATA_OUT->drops = 0;
159        FORMAT_DATA_OUT->device = NULL;
160        FORMAT_DATA_OUT->dagstream = 0;
161        FORMAT_DATA_OUT->processed = 0;
162        FORMAT_DATA_OUT->bottom = NULL;
163        FORMAT_DATA_OUT->top = NULL;
164        FORMAT_DATA_OUT->waiting = 0;
165
166}
167
168static void dag_init_format_data(libtrace_t *libtrace) {
169        libtrace->format_data = (struct dag_format_data_t *)
170                malloc(sizeof(struct dag_format_data_t));
171        DUCK.last_duck = 0;
172        DUCK.duck_freq = 0;
173        DUCK.last_pkt = 0;
174        DUCK.dummy_duck = NULL;
175        FORMAT_DATA->stream_attached = 0;
176        FORMAT_DATA->drops = 0;
177        FORMAT_DATA->device = NULL;
178        FORMAT_DATA->dagstream = 0;
179        FORMAT_DATA->processed = 0;
180        FORMAT_DATA->bottom = NULL;
181        FORMAT_DATA->top = NULL;
182}
183
184/* NOTE: This function assumes the open_dag_mutex is held by the caller */
185static struct dag_dev_t *dag_find_open_device(char *dev_name) {
186        struct dag_dev_t *dag_dev;
187
188        dag_dev = open_dags;
189
190        /* XXX: Not exactly zippy, but how often are we going to be dealing
191         * with multiple dag cards? */
192        while (dag_dev != NULL) {
193                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
194                        dag_dev->ref_count ++;
195                        return dag_dev;
196
197                }
198                dag_dev = dag_dev->next;
199        }
200        return NULL;
201
202
203}
204
205/* NOTE: This function assumes the open_dag_mutex is held by the caller */
206static void dag_close_device(struct dag_dev_t *dev) {
207        /* Need to remove from the device list */
208
209        assert(dev->ref_count == 0);
210
211        if (dev->prev == NULL) {
212                open_dags = dev->next;
213                if (dev->next)
214                        dev->next->prev = NULL;
215        } else {
216                dev->prev->next = dev->next;
217                if (dev->next)
218                        dev->next->prev = dev->prev;
219        }
220
221        dag_close(dev->fd);
222        if (dev->dev_name)
223        free(dev->dev_name);
224        free(dev);
225}
226
227static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
228        struct stat buf;
229        int fd;
230        struct dag_dev_t *new_dev;
231
232        if (stat(dev_name, &buf) == -1) {
233                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
234                return NULL;
235}
236
237        if (S_ISCHR(buf.st_mode)) {
238                if((fd = dag_open(dev_name)) < 0) {
239                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
240                                        dev_name);
241                        return NULL;
242                }
243        } else {
244                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
245                                dev_name);
246                return NULL;
247        }
248
249        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
250        new_dev->fd = fd;
251        new_dev->dev_name = dev_name;
252        new_dev->ref_count = 1;
253
254        new_dev->prev = NULL;
255        new_dev->next = open_dags;
256        if (open_dags)
257                open_dags->prev = new_dev;
258
259        open_dags = new_dev;
260
261        return new_dev;
262}
263
264/* NOTE: This function assumes the open_dag_mutex is held by the caller */
265static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
266        struct stat buf;
267        int fd;
268        struct dag_dev_t *new_dev;
269
270        if (stat(dev_name, &buf) == -1) {
271                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
272                return NULL;
273        }
274
275        if (S_ISCHR(buf.st_mode)) {
276                if((fd = dag_open(dev_name)) < 0) {
277                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
278                                        dev_name);
279                        return NULL;
280                }
281        } else {
282                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
283                                dev_name);
284                return NULL;
285        }
286
287        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
288        new_dev->fd = fd;
289        new_dev->dev_name = dev_name;
290        new_dev->ref_count = 1;
291
292        new_dev->prev = NULL;
293        new_dev->next = open_dags;
294        if (open_dags)
295                open_dags->prev = new_dev;
296
297        open_dags = new_dev;
298
299        return new_dev;
300}
301
302static int dag_init_output(libtrace_out_t *libtrace) {
303        char *dag_dev_name = NULL;
304        char *scan = NULL;
305        struct dag_dev_t *dag_device = NULL;
306        int stream = 1;
307        unsigned long wake_time;
308        dagutil_sleep_get_wake_time(&wake_time,0);
309
310        dag_init_format_out_data(libtrace);
311        pthread_mutex_lock(&open_dag_mutex);
312        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
313                dag_dev_name = strdup(libtrace->uridata);
314        } else {
315                dag_dev_name = (char *)strndup(libtrace->uridata,
316                                (size_t)(scan - libtrace->uridata));
317                stream = atoi(++scan);
318        }
319        FORMAT_DATA_OUT->dagstream = stream;
320
321        dag_device = dag_find_open_device(dag_dev_name);
322
323        if (dag_device == NULL) {
324                /* Device not yet opened - open it ourselves */
325                dag_device = dag_open_output_device(libtrace, dag_dev_name);
326        } else {
327                free(dag_dev_name);
328                dag_dev_name = NULL;
329        }
330
331        if (dag_device == NULL) {
332                if (dag_dev_name) {
333                        free(dag_dev_name);
334                }
335                return -1;
336        }
337
338        FORMAT_DATA_OUT->device = dag_device;
339        pthread_mutex_unlock(&open_dag_mutex);
340        return 0;
341}
342
343static int dag_init_input(libtrace_t *libtrace) {
344        char *dag_dev_name = NULL;
345        char *scan = NULL;
346        int stream = 0;
347        struct dag_dev_t *dag_device = NULL;
348
349        dag_init_format_data(libtrace);
350        pthread_mutex_lock(&open_dag_mutex);
351        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
352                dag_dev_name = strdup(libtrace->uridata);
353        } else {
354                dag_dev_name = (char *)strndup(libtrace->uridata,
355                                (size_t)(scan - libtrace->uridata));
356                stream = atoi(++scan);
357        }
358
359        FORMAT_DATA->dagstream = stream;
360
361        dag_device = dag_find_open_device(dag_dev_name);
362
363        if (dag_device == NULL) {
364                /* Device not yet opened - open it ourselves */
365                dag_device = dag_open_device(libtrace, dag_dev_name);
366        } else {
367                free(dag_dev_name);
368                dag_dev_name = NULL;
369        }
370
371        if (dag_device == NULL) {
372                if (dag_dev_name)
373                        free(dag_dev_name);
374                dag_dev_name = NULL;
375                return -1;
376        }
377
378        FORMAT_DATA->device = dag_device;
379
380        pthread_mutex_unlock(&open_dag_mutex);
381        return 0;
382}
383
384static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
385                                void *data) {
386        char conf_str[4096];
387        switch(option) {
388                case TRACE_OPTION_META_FREQ:
389                        DUCK.duck_freq = *(int *)data;
390                        return 0;
391                case TRACE_OPTION_SNAPLEN:
392                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
393                        if (dag_configure(FORMAT_DATA->device->fd,
394                                                conf_str) != 0) {
395                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
396                                return -1;
397                        }
398                        return 0;
399                case TRACE_OPTION_PROMISC:
400                        /* DAG already operates in a promisc fashion */
401                        return -1;
402                case TRACE_OPTION_FILTER:
403                        return -1;
404                case TRACE_OPTION_EVENT_REALTIME:
405                        return -1;
406        }
407        return -1;
408}
409static int dag_start_output(libtrace_out_t *libtrace) {
410        struct timeval zero, nopoll;
411        uint8_t *top, *bottom;
412        top = bottom = NULL;
413
414        zero.tv_sec = 0;
415        zero.tv_usec = 0;
416        nopoll = zero;
417
418        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
419                        FORMAT_DATA_OUT->dagstream, 0, 1048576) < 0) {
420                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
421                return -1;
422        }
423
424        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
425                        FORMAT_DATA_OUT->dagstream) < 0) {
426                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
427                return -1;
428        }
429        FORMAT_DATA_OUT->stream_attached = 1;
430
431        /* We don't want the dag card to do any sleeping */
432
433        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
434                        FORMAT_DATA_OUT->dagstream, 0, &zero,
435                        &nopoll);
436
437        return 0;
438}
439
440static int dag_start_input(libtrace_t *libtrace) {
441        struct timeval zero, nopoll;
442        uint8_t *top, *bottom;
443        uint8_t diff = 0;
444        top = bottom = NULL;
445
446        zero.tv_sec = 0;
447        zero.tv_usec = 0;
448        nopoll = zero;
449
450
451
452        if (dag_attach_stream(FORMAT_DATA->device->fd,
453                                FORMAT_DATA->dagstream, 0, 0) < 0) {
454                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
455                return -1;
456        }
457
458        if (dag_start_stream(FORMAT_DATA->device->fd,
459                                FORMAT_DATA->dagstream) < 0) {
460                trace_set_err(libtrace, errno, "Cannot start DAG stream");
461                return -1;
462        }
463        FORMAT_DATA->stream_attached = 1;
464        /* We don't want the dag card to do any sleeping */
465        dag_set_stream_poll(FORMAT_DATA->device->fd,
466                                FORMAT_DATA->dagstream, 0, &zero,
467                                &nopoll);
468
469        /* Should probably flush the memory hole now */
470
471        do {
472                top = dag_advance_stream(FORMAT_DATA->device->fd,
473                                        FORMAT_DATA->dagstream,
474                                        &bottom);
475                assert(top && bottom);
476                diff = top - bottom;
477                bottom -= diff;
478        } while (diff != 0);
479        FORMAT_DATA->top = NULL;
480        FORMAT_DATA->bottom = NULL;
481        FORMAT_DATA->processed = 0;
482        FORMAT_DATA->drops = 0;
483
484        return 0;
485}
486
487static int dag_pause_output(libtrace_out_t *libtrace) {
488        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
489                        FORMAT_DATA_OUT->dagstream) < 0) {
490                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
491                return -1;
492        }
493        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
494                        FORMAT_DATA_OUT->dagstream) < 0) {
495                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
496                return -1;
497        }
498        FORMAT_DATA_OUT->stream_attached = 0;
499        return 0;
500}
501
502static int dag_pause_input(libtrace_t *libtrace) {
503        if (dag_stop_stream(FORMAT_DATA->device->fd,
504                                FORMAT_DATA->dagstream) < 0) {
505                trace_set_err(libtrace, errno, "Could not stop DAG stream");
506                return -1;
507        }
508        if (dag_detach_stream(FORMAT_DATA->device->fd,
509                                FORMAT_DATA->dagstream) < 0) {
510                trace_set_err(libtrace, errno, "Could not detach DAG stream");
511                return -1;
512        }
513        FORMAT_DATA->stream_attached = 0;
514        return 0;
515}
516
517static int dag_fin_input(libtrace_t *libtrace) {
518        pthread_mutex_lock(&open_dag_mutex);
519        if (FORMAT_DATA->stream_attached)
520                dag_pause_input(libtrace);
521        FORMAT_DATA->device->ref_count --;
522
523        if (FORMAT_DATA->device->ref_count == 0)
524                dag_close_device(FORMAT_DATA->device);
525        if (DUCK.dummy_duck)
526                trace_destroy_dead(DUCK.dummy_duck);
527        free(libtrace->format_data);
528        pthread_mutex_unlock(&open_dag_mutex);
529        return 0; /* success */
530}
531
532static int dag_fin_output(libtrace_out_t *libtrace) {
533        // commit any outstanding traffic in the txbuffer
534        if (FORMAT_DATA_OUT->waiting) {
535                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
536                                FORMAT_DATA_OUT->waiting );
537        }
538
539        // wait until the buffer is nearly clear before exiting the program, as we
540        // will lose packets otherwise
541        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
542                        FORMAT_DATA_OUT->dagstream,
543                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
544                                        FORMAT_DATA_OUT->dagstream) - 8
545                        );
546
547        pthread_mutex_lock(&open_dag_mutex);
548        if (FORMAT_DATA_OUT->stream_attached)
549                dag_pause_output(libtrace);
550        FORMAT_DATA_OUT->device->ref_count --;
551
552        if (FORMAT_DATA_OUT->device->ref_count == 0)
553                dag_close_device(FORMAT_DATA_OUT->device);
554        free(libtrace->format_data);
555        pthread_mutex_unlock(&open_dag_mutex);
556        return 0; /* success */
557}
558
559static int dag_get_duckinfo(libtrace_t *libtrace,
560                                libtrace_packet_t *packet) {
561        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
562                        !packet->buffer) {
563                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
564                packet->buf_control = TRACE_CTRL_PACKET;
565                if (!packet->buffer) {
566                        trace_set_err(libtrace, errno,
567                                        "Cannot allocate packet buffer");
568                        return -1;
569                }
570        }
571
572        packet->header = 0;
573        packet->payload = packet->buffer;
574
575        /* No need to check if we can get DUCK or not - we're modern
576         * enough */
577        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
578                                        (duckinf_t *)packet->payload) < 0)) {
579                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
580                return -1;
581        }
582
583        packet->type = TRACE_RT_DUCK_2_5;
584        if (!DUCK.dummy_duck)
585                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
586        packet->trace = DUCK.dummy_duck;
587        return sizeof(duckinf_t);
588}
589
590static int dag_available(libtrace_t *libtrace) {
591        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
592        /* If we've processed more than 4MB of data since we last called
593         * dag_advance_stream, then we should call it again to allow the
594         * space occupied by that 4MB to be released */
595        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
596                return diff;
597        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
598                        FORMAT_DATA->dagstream,
599                        &(FORMAT_DATA->bottom));
600        if (FORMAT_DATA->top == NULL) {
601                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
602                return -1;
603        }
604        FORMAT_DATA->processed = 0;
605        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
606        return diff;
607}
608
609static dag_record_t *dag_get_record(libtrace_t *libtrace) {
610        dag_record_t *erfptr = NULL;
611        uint16_t size;
612        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
613        if (!erfptr)
614                return NULL;
615        size = ntohs(erfptr->rlen);
616        assert( size >= dag_record_size );
617        /* Make certain we have the full packet available */
618        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
619                return NULL;
620        FORMAT_DATA->bottom += size;
621        FORMAT_DATA->processed += size;
622        return erfptr;
623}
624
625static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
626                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
627
628        dag_record_t *erfptr;
629
630        if (packet->buffer != buffer &&
631                        packet->buf_control == TRACE_CTRL_PACKET) {
632                free(packet->buffer);
633        }
634
635        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
636                packet->buf_control = TRACE_CTRL_PACKET;
637        } else
638                packet->buf_control = TRACE_CTRL_EXTERNAL;
639
640        erfptr = (dag_record_t *)buffer;
641        packet->buffer = erfptr;
642        packet->header = erfptr;
643        packet->type = rt_type;
644
645        if (erfptr->flags.rxerror == 1) {
646                /* rxerror means the payload is corrupt - drop it
647                 * by tweaking rlen */
648                packet->payload = NULL;
649                erfptr->rlen = htons(erf_get_framing_length(packet));
650        } else {
651                packet->payload = (char*)packet->buffer
652                        + erf_get_framing_length(packet);
653        }
654
655        if (libtrace->format_data == NULL) {
656                dag_init_format_data(libtrace);
657        }
658
659        /* No loss counter for DSM coloured records - have to use
660         * some other API */
661        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
662
663        } else {
664                DATA(libtrace)->drops += ntohs(erfptr->lctr);
665        }
666
667        return 0;
668}
669
670/*
671 * dag_write_packet() at this stage attempts to improve tx performance
672 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
673 * has been met. I observed approximately 270% performance increase
674 * through this relatively naive tweak. No optimisation of buffer sizes
675 * was attempted.
676 */
677
678static int dag_dump_packet(libtrace_out_t *libtrace,
679                dag_record_t *erfptr, unsigned int pad, void *buffer) {
680        //int numbytes = 0;
681        int size;
682
683        /*
684         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
685         * requested any space yet, and request some, storing the pointer at
686         * FORMAT_DATA_OUT->txbuffer.
687         *
688         * The amount to request is slightly magical at the moment - it's
689         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
690         * the buffer and handle overruns.
691         */
692        if (FORMAT_DATA_OUT->waiting == 0) {
693                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
694                                FORMAT_DATA_OUT->dagstream, 16908288);
695        }
696
697        /*
698         * Copy the header separately to the body, as we can't guarantee they are
699         * in contiguous memory
700         */
701        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
702        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
703
704
705
706        /*
707         * Copy our incoming packet into the outgoing buffer, and increment our waiting count
708         */
709        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
710        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
711        FORMAT_DATA_OUT->waiting += size;
712
713        /*
714         * if our output buffer has more than 16 Mebibytes in it, commit those bytes and
715         * reset the waiting count to 0.
716         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in case
717         * there is still data in the buffer at program exit.
718         */
719
720        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
721                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
722                        FORMAT_DATA_OUT->waiting );
723                FORMAT_DATA_OUT->waiting = 0;
724        }
725
726        return size + pad + dag_record_size;
727
728}
729
730static bool find_compatible_linktype(libtrace_out_t *libtrace,
731                                libtrace_packet_t *packet)
732{
733         // Keep trying to simplify the packet until we can find
734         //something we can do with it
735
736        do {
737                char type=libtrace_to_erf_type(trace_get_link_type(packet));
738
739                // Success
740                if (type != (char)-1)
741                        return true;
742
743                if (!demote_packet(packet)) {
744                        trace_set_err_out(libtrace,
745                                        TRACE_ERR_NO_CONVERSION,
746                                        "No erf type for packet (%i)",
747                                        trace_get_link_type(packet));
748                        return false;
749                }
750
751        } while(1);
752
753        return true;
754}
755
756static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
757        /*
758         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding sucks,
759         * sorry about that.
760         */
761        unsigned int pad = 0;
762        int numbytes;
763        void *payload = packet->payload;
764        dag_record_t *header = (dag_record_t *)packet->header;
765
766        FORMAT_DATA_OUT->processed ++;
767        if(!packet->header) {
768                // No header, probably an RT packet. Lifted from erf_write_packet().
769                return -1;
770        }
771
772        pad = dag_get_padding(packet);
773
774        /*
775         * if the payload is null, adjust the rlen. Discussion of this is
776         * attached to erf_write_packet()
777         */
778        if (payload == NULL) {
779                header->rlen = htons(dag_record_size + pad);
780        }
781
782        if (packet->type == TRACE_RT_DATA_ERF) {
783                numbytes = dag_dump_packet(libtrace,
784                                header,
785                                pad,
786                                payload
787                                );
788
789        } else {
790                /* Build up a new packet header from the existing header */
791
792                // Simplify the packet first - if we can't do this, break early
793                if (!find_compatible_linktype(libtrace,packet))
794                        return -1;
795
796                dag_record_t erfhdr;
797
798                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
799                payload=packet->payload;
800                pad = dag_get_padding(packet);
801
802                /* Flags. Can't do this */
803                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
804                if (trace_get_direction(packet)!=~0U)
805                        erfhdr.flags.iface = trace_get_direction(packet);
806
807                erfhdr.type = libtrace_to_erf_type(trace_get_link_type(packet));
808
809                /* Packet length (rlen includes format overhead) */
810                assert(trace_get_capture_length(packet)>0
811                                && trace_get_capture_length(packet)<=65536);
812                assert(erf_get_framing_length(packet)>0
813                                && trace_get_framing_length(packet)<=65536);
814                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
815                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
816
817                erfhdr.rlen = htons(trace_get_capture_length(packet)
818                        + erf_get_framing_length(packet));
819
820
821                /* loss counter. Can't do this */
822                erfhdr.lctr = 0;
823                /* Wire length, does not include padding! */
824                erfhdr.wlen = htons(trace_get_wire_length(packet));
825
826                /* Write it out */
827                numbytes = dag_dump_packet(libtrace,
828                                &erfhdr,
829                                pad,
830                                payload);
831
832        }
833
834        return numbytes;
835}
836
837static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
838        int size = 0;
839        struct timeval tv;
840        dag_record_t *erfptr = NULL;
841        int numbytes = 0;
842        uint32_t flags = 0;
843
844        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
845                        DUCK.duck_freq != 0) {
846                size = dag_get_duckinfo(libtrace, packet);
847                DUCK.last_duck = DUCK.last_pkt;
848                if (size != 0) {
849                        return size;
850                }
851                /* No DUCK support, so don't waste our time anymore */
852                DUCK.duck_freq = 0;
853        }
854
855        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
856
857        if (packet->buf_control == TRACE_CTRL_PACKET) {
858                free(packet->buffer);
859                packet->buffer = 0;
860        }
861
862        do {
863                numbytes = dag_available(libtrace);
864                if (numbytes < 0)
865                        return numbytes;
866                if (numbytes < dag_record_size)
867                        /* Block until we see a packet */
868                        continue;
869                erfptr = dag_get_record(libtrace);
870        } while (erfptr == NULL);
871
872        //dag_form_packet(erfptr, packet);
873        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
874                                flags))
875                return -1;
876        tv = trace_get_timeval(packet);
877        DUCK.last_pkt = tv.tv_sec;
878
879        return packet->payload ? htons(erfptr->rlen) :
880                                erf_get_framing_length(packet);
881}
882
883static libtrace_eventobj_t trace_event_dag(libtrace_t *trace,
884                                        libtrace_packet_t *packet) {
885        libtrace_eventobj_t event = {0,0,0.0,0};
886        dag_record_t *erfptr = NULL;
887        int numbytes;
888        uint32_t flags = 0;
889
890        do {
891                erfptr = NULL;
892                numbytes = 0;
893       
894                /* Need to call dag_available so that the top pointer will get
895                 * updated, otherwise we'll never see any data! */
896                numbytes = dag_available(trace);
897
898                /* May as well not bother calling dag_get_record if
899                 * dag_available suggests that there's no data */
900                if (numbytes != 0)
901                        erfptr = dag_get_record(trace);
902                if (erfptr == NULL) {
903                        /* No packet available */
904                        event.type = TRACE_EVENT_SLEEP;
905                        event.seconds = 0.0001;
906                        break;
907                }
908                //dag_form_packet(erfptr, packet);
909                if (dag_prepare_packet(trace, packet, erfptr, 
910                                        TRACE_RT_DATA_ERF, flags)) {
911                        event.type = TRACE_EVENT_TERMINATE;
912                        break;
913                }
914
915
916                event.size = trace_get_capture_length(packet) + 
917                                trace_get_framing_length(packet);
918                if (trace->filter) {
919                        if (trace_apply_filter(trace->filter, packet)) {
920                                event.type = TRACE_EVENT_PACKET;
921                        } else {
922                                /* This packet isn't useful so we want to
923                                 * immediately see if there is another suitable
924                                 * one - we definitely DO NOT want to return
925                                 * a sleep event in this case, like we used to
926                                 * do! */
927                                continue;
928                        }
929                } else {
930                        event.type = TRACE_EVENT_PACKET;
931                }
932
933                if (trace->snaplen > 0) {
934                        trace_set_capture_length(packet, trace->snaplen);
935                }
936                break;
937        } while (1);
938
939        return event;
940}
941
942static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
943        if (trace->format_data == NULL)
944                return (uint64_t)-1;
945        return DATA(trace)->drops;
946}
947
948static void dag_help(void) {
949        printf("dag format module: $Revision$\n");
950        printf("Supported input URIs:\n");
951        printf("\tdag:/dev/dagn\n");
952        printf("\n");
953        printf("\te.g.: dag:/dev/dag0\n");
954        printf("\n");
955        printf("Supported output URIs:\n");
956        printf("\tnone\n");
957        printf("\n");
958}
959
960static struct libtrace_format_t dag = {
961        "dag",
962        "$Id$",
963        TRACE_FORMAT_ERF,
964        dag_probe_filename,             /* probe filename */
965        NULL,                           /* probe magic */
966        dag_init_input,                 /* init_input */
967        dag_config_input,               /* config_input */
968        dag_start_input,                /* start_input */
969        dag_pause_input,                /* pause_input */
970        dag_init_output,                /* init_output */
971        NULL,                           /* config_output */
972        dag_start_output,               /* start_output */
973        dag_fin_input,                  /* fin_input */
974        dag_fin_output,                 /* fin_output */
975        dag_read_packet,                /* read_packet */
976        dag_prepare_packet,             /* prepare_packet */
977        NULL,                           /* fin_packet */
978        dag_write_packet,               /* write_packet */
979        erf_get_link_type,              /* get_link_type */
980        erf_get_direction,              /* get_direction */
981        erf_set_direction,              /* set_direction */
982        erf_get_erf_timestamp,          /* get_erf_timestamp */
983        NULL,                           /* get_timeval */
984        NULL,                           /* get_seconds */
985        NULL,                           /* get_timespec */
986        NULL,                           /* seek_erf */
987        NULL,                           /* seek_timeval */
988        NULL,                           /* seek_seconds */
989        erf_get_capture_length,         /* get_capture_length */
990        erf_get_wire_length,            /* get_wire_length */
991        erf_get_framing_length,         /* get_framing_length */
992        erf_set_capture_length,         /* set_capture_length */
993        NULL,                           /* get_received_packets */
994        NULL,                           /* get_filtered_packets */
995        dag_get_dropped_packets,        /* get_dropped_packets */
996        NULL,                           /* get_captured_packets */
997        NULL,                           /* get_fd */
998        trace_event_dag,                /* trace_event */
999        dag_help,                       /* help */
1000        NULL                            /* next pointer */
1001};
1002
1003void dag_constructor(void) {
1004        register_format(&dag);
1005}
Note: See TracBrowser for help on using the repository browser.