source: lib/format_dag25.c @ 70bf39a

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 70bf39a was 70bf39a, checked in by Shane Alcock <salcock@…>, 7 years ago

Fix dodgy stream buffer flushing in DAG

Not sure what was going on with that flushing code, but I'm pretty sure
that wasn't going to work.

Ignore loss counters on the first packet observed on each interface.
Modern DAG interfaces treat any packets that appear while you're not
capturing as lost, so stopping and restarting a DAG capture will always
result in a non-zero loss counter. We tend to use the loss counter to
only indicate packets lost during active capture, so we need to compensate.

  • Property mode set to 100644
File size: 38.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* The DAG device being used for writing */
107        struct dag_dev_t *device;
108        /* The DAG stream that is being written on */
109        unsigned int dagstream;
110        /* Boolean flag indicating whether the stream is currently attached */
111        int stream_attached;
112        /* The amount of data waiting to be transmitted, in bytes */
113        uint64_t waiting;
114        /* A buffer to hold the data to be transmittted */
115        uint8_t *txbuffer;
116};
117
118/* "Global" data that is stored for each DAG input trace */
119struct dag_format_data_t {
120
121        /* Data required for regular DUCK reporting */
122        struct {
123                /* Timestamp of the last DUCK report */
124                uint32_t last_duck;
125                /* The number of seconds between each DUCK report */
126                uint32_t duck_freq;
127                /* Timestamp of the last packet read from the DAG card */
128                uint32_t last_pkt;
129                /* Dummy trace to ensure DUCK packets are dealt with using the
130                 * DUCK format functions */
131                libtrace_t *dummy_duck;
132        } duck;
133
134        /* The DAG device that we are reading from */
135        struct dag_dev_t *device;
136        /* The DAG stream that we are reading from */
137        unsigned int dagstream;
138        /* Boolean flag indicating whether the stream is currently attached */
139        int stream_attached;
140        /* Pointer to the first unread byte in the DAG memory hole */
141        uint8_t *bottom;
142        /* Pointer to the last unread byte in the DAG memory hole */
143        uint8_t *top;
144        /* The amount of data processed thus far from the bottom pointer */
145        uint32_t processed;
146        /* The number of packets that have been dropped */
147        uint64_t drops;
148
149        uint8_t seeninterface[4];
150};
151
152/* To be thread-safe, we're going to need a mutex for operating on the list
153 * of DAG devices */
154pthread_mutex_t open_dag_mutex;
155
156/* The list of DAG devices that have been opened by libtrace.
157 *
158 * We can only open each DAG device once, but we might want to read from
159 * multiple streams. Therefore, we need to maintain a list of devices that we
160 * have opened (with ref counts!) so that we don't try to open a device too
161 * many times or close a device that we're still using */
162struct dag_dev_t *open_dags = NULL;
163
164/* Returns the amount of padding between the ERF header and the start of the
165 * captured packet data */
166static int dag_get_padding(const libtrace_packet_t *packet)
167{
168        /* ERF Ethernet records have a 2 byte padding before the packet itself
169         * so that the IP header is aligned on a 32 bit boundary.
170         */
171        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
172                dag_record_t *erfptr = (dag_record_t *)packet->header;
173                switch(erfptr->type) {
174                        case TYPE_ETH:
175                        case TYPE_DSM_COLOR_ETH:
176                                return 2;
177                        default:                return 0;
178                }
179        }
180        else {
181                switch(trace_get_link_type(packet)) {
182                        case TRACE_TYPE_ETH:    return 2;
183                        default:                return 0;
184                }
185        }
186}
187
188/* Attempts to determine if the given filename refers to a DAG device */
189static int dag_probe_filename(const char *filename)
190{
191        struct stat statbuf;
192        /* Can we stat the file? */
193        if (stat(filename, &statbuf) != 0) {
194                return 0;
195        }
196        /* Is it a character device? */
197        if (!S_ISCHR(statbuf.st_mode)) {
198                return 0;
199        }
200        /* Yeah, it's probably us. */
201        return 1;
202}
203
204/* Initialises the DAG output data structure */
205static void dag_init_format_out_data(libtrace_out_t *libtrace) {
206        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
207        // no DUCK on output
208        FORMAT_DATA_OUT->stream_attached = 0;
209        FORMAT_DATA_OUT->device = NULL;
210        FORMAT_DATA_OUT->dagstream = 0;
211        FORMAT_DATA_OUT->waiting = 0;
212
213}
214
215/* Initialises the DAG input data structure */
216static void dag_init_format_data(libtrace_t *libtrace) {
217        libtrace->format_data = (struct dag_format_data_t *)
218                malloc(sizeof(struct dag_format_data_t));
219        DUCK.last_duck = 0;
220        DUCK.duck_freq = 0;
221        DUCK.last_pkt = 0;
222        DUCK.dummy_duck = NULL;
223        FORMAT_DATA->stream_attached = 0;
224        FORMAT_DATA->drops = 0;
225        FORMAT_DATA->device = NULL;
226        FORMAT_DATA->dagstream = 0;
227        FORMAT_DATA->processed = 0;
228        FORMAT_DATA->bottom = NULL;
229        FORMAT_DATA->top = NULL;
230        memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
231}
232
233/* Determines if there is already an entry for the given DAG device in the
234 * device list and increments the reference count for that device, if found.
235 *
236 * NOTE: This function assumes the open_dag_mutex is held by the caller */
237static struct dag_dev_t *dag_find_open_device(char *dev_name) {
238        struct dag_dev_t *dag_dev;
239
240        dag_dev = open_dags;
241
242        /* XXX: Not exactly zippy, but how often are we going to be dealing
243         * with multiple dag cards? */
244        while (dag_dev != NULL) {
245                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
246                        dag_dev->ref_count ++;
247                        return dag_dev;
248
249                }
250                dag_dev = dag_dev->next;
251        }
252        return NULL;
253
254
255}
256
257/* Closes a DAG device and removes it from the device list.
258 *
259 * Attempting to close a DAG device that has a non-zero reference count will
260 * cause an assertion failure!
261 *
262 * NOTE: This function assumes the open_dag_mutex is held by the caller */
263static void dag_close_device(struct dag_dev_t *dev) {
264        /* Need to remove from the device list */
265
266        assert(dev->ref_count == 0);
267
268        if (dev->prev == NULL) {
269                open_dags = dev->next;
270                if (dev->next)
271                        dev->next->prev = NULL;
272        } else {
273                dev->prev->next = dev->next;
274                if (dev->next)
275                        dev->next->prev = dev->prev;
276        }
277
278        dag_close(dev->fd);
279        if (dev->dev_name)
280                free(dev->dev_name);
281        free(dev);
282}
283
284
285/* Opens a new DAG device for writing and adds it to the DAG device list
286 *
287 * NOTE: this function should only be called when opening a DAG device for
288 * writing - there is little practical difference between this and the
289 * function below that covers the reading case, but we need the output trace
290 * object to report errors properly so the two functions take slightly
291 * different arguments. This is really lame and there should be a much better
292 * way of doing this.
293 *
294 * NOTE: This function assumes the open_dag_mutex is held by the caller
295 */
296static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
297        struct stat buf;
298        int fd;
299        struct dag_dev_t *new_dev;
300
301        /* Make sure the device exists */
302        if (stat(dev_name, &buf) == -1) {
303                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
304                return NULL;
305}
306
307        /* Make sure it is the appropriate type of device */
308        if (S_ISCHR(buf.st_mode)) {
309                /* Try opening the DAG device */
310                if((fd = dag_open(dev_name)) < 0) {
311                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
312                                        dev_name);
313                        return NULL;
314                }
315        } else {
316                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
317                                dev_name);
318                return NULL;
319        }
320
321        /* Add the device to our device list - it is just a doubly linked
322         * list with no inherent ordering; just tack the new one on the front
323         */
324        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
325        new_dev->fd = fd;
326        new_dev->dev_name = dev_name;
327        new_dev->ref_count = 1;
328
329        new_dev->prev = NULL;
330        new_dev->next = open_dags;
331        if (open_dags)
332                open_dags->prev = new_dev;
333
334        open_dags = new_dev;
335
336        return new_dev;
337}
338
339/* Opens a new DAG device for reading and adds it to the DAG device list
340 *
341 * NOTE: this function should only be called when opening a DAG device for
342 * reading - there is little practical difference between this and the
343 * function above that covers the writing case, but we need the input trace
344 * object to report errors properly so the two functions take slightly
345 * different arguments. This is really lame and there should be a much better
346 * way of doing this.
347 *
348 * NOTE: This function assumes the open_dag_mutex is held by the caller */
349static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
350        struct stat buf;
351        int fd;
352        struct dag_dev_t *new_dev;
353
354        /* Make sure the device exists */
355        if (stat(dev_name, &buf) == -1) {
356                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
357                return NULL;
358        }
359
360        /* Make sure it is the appropriate type of device */
361        if (S_ISCHR(buf.st_mode)) {
362                /* Try opening the DAG device */
363                if((fd = dag_open(dev_name)) < 0) {
364                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
365                                        dev_name);
366                        return NULL;
367                }
368        } else {
369                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
370                                dev_name);
371                return NULL;
372        }
373
374        /* Add the device to our device list - it is just a doubly linked
375         * list with no inherent ordering; just tack the new one on the front
376         */
377        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
378        new_dev->fd = fd;
379        new_dev->dev_name = dev_name;
380        new_dev->ref_count = 1;
381
382        new_dev->prev = NULL;
383        new_dev->next = open_dags;
384        if (open_dags)
385                open_dags->prev = new_dev;
386
387        open_dags = new_dev;
388
389        return new_dev;
390}
391
392/* Creates and initialises a DAG output trace */
393static int dag_init_output(libtrace_out_t *libtrace) {
394        char *dag_dev_name = NULL;
395        char *scan = NULL;
396        struct dag_dev_t *dag_device = NULL;
397        int stream = 1;
398       
399        /* XXX I don't know if this is important or not, but this function
400         * isn't present in all of the driver releases that this code is
401         * supposed to support! */
402        /*
403        unsigned long wake_time;
404        dagutil_sleep_get_wake_time(&wake_time,0);
405        */
406
407        dag_init_format_out_data(libtrace);
408        /* Grab the mutex while we're likely to be messing with the device
409         * list */
410        pthread_mutex_lock(&open_dag_mutex);
411       
412        /* Specific streams are signified using a comma in the libtrace URI,
413         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
414         *
415         * If no stream is specified, we will write using stream 1 */
416        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
417                dag_dev_name = strdup(libtrace->uridata);
418        } else {
419                dag_dev_name = (char *)strndup(libtrace->uridata,
420                                (size_t)(scan - libtrace->uridata));
421                stream = atoi(++scan);
422        }
423        FORMAT_DATA_OUT->dagstream = stream;
424
425        /* See if our DAG device is already open */
426        dag_device = dag_find_open_device(dag_dev_name);
427
428        if (dag_device == NULL) {
429                /* Device not yet opened - open it ourselves */
430                dag_device = dag_open_output_device(libtrace, dag_dev_name);
431        } else {
432                /* Otherwise, just use the existing one */
433                free(dag_dev_name);
434                dag_dev_name = NULL;
435        }
436
437        /* Make sure we have successfully opened a DAG device */
438        if (dag_device == NULL) {
439                if (dag_dev_name) {
440                        free(dag_dev_name);
441                }
442                pthread_mutex_unlock(&open_dag_mutex);
443                return -1;
444        }
445
446        FORMAT_DATA_OUT->device = dag_device;
447        pthread_mutex_unlock(&open_dag_mutex);
448        return 0;
449}
450
451/* Creates and initialises a DAG input trace */
452static int dag_init_input(libtrace_t *libtrace) {
453        char *dag_dev_name = NULL;
454        char *scan = NULL;
455        int stream = 0;
456        struct dag_dev_t *dag_device = NULL;
457
458        dag_init_format_data(libtrace);
459        /* Grab the mutex while we're likely to be messing with the device
460         * list */
461        pthread_mutex_lock(&open_dag_mutex);
462       
463       
464        /* Specific streams are signified using a comma in the libtrace URI,
465         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
466         *
467         * If no stream is specified, we will read from stream 0 */
468        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
469                dag_dev_name = strdup(libtrace->uridata);
470        } else {
471                dag_dev_name = (char *)strndup(libtrace->uridata,
472                                (size_t)(scan - libtrace->uridata));
473                stream = atoi(++scan);
474        }
475
476        FORMAT_DATA->dagstream = stream;
477
478        /* See if our DAG device is already open */
479        dag_device = dag_find_open_device(dag_dev_name);
480
481        if (dag_device == NULL) {
482                /* Device not yet opened - open it ourselves */
483                dag_device = dag_open_device(libtrace, dag_dev_name);
484        } else {
485                /* Otherwise, just use the existing one */
486                free(dag_dev_name);
487                dag_dev_name = NULL;
488        }
489
490        /* Make sure we have successfully opened a DAG device */
491        if (dag_device == NULL) {
492                if (dag_dev_name)
493                        free(dag_dev_name);
494                dag_dev_name = NULL;
495                pthread_mutex_unlock(&open_dag_mutex);
496                return -1;
497        }
498
499        FORMAT_DATA->device = dag_device;
500
501        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
502        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
503 
504        *  The symptom of the port being disabled is that libtrace will appear to hang.
505        */
506        /* Check kBooleanAttributeFault is false */
507        /* Check kBooleanAttributeLocalFault is false */
508        /* Check kBooleanAttributeLock is true ? */
509        /* Check kBooleanAttributePeerLink ? */
510
511        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
512        /* Set kUint32AttributeSnapLength to the snaplength */
513
514        pthread_mutex_unlock(&open_dag_mutex);
515        return 0;
516}
517
518/* Configures a DAG input trace */
519static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
520                                void *data) {
521        char conf_str[4096];
522        switch(option) {
523                case TRACE_OPTION_META_FREQ:
524                        /* This option is used to specify the frequency of DUCK
525                         * updates */
526                        DUCK.duck_freq = *(int *)data;
527                        return 0;
528                case TRACE_OPTION_SNAPLEN:
529                        /* Tell the card our new snap length */
530                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
531                        if (dag_configure(FORMAT_DATA->device->fd,
532                                                conf_str) != 0) {
533                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
534                                return -1;
535                        }
536                        return 0;
537                case TRACE_OPTION_PROMISC:
538                        /* DAG already operates in a promisc fashion */
539                        return -1;
540                case TRACE_OPTION_FILTER:
541                        /* We don't yet support pushing filters into DAG
542                         * cards */
543                        return -1;
544                case TRACE_OPTION_EVENT_REALTIME:
545                        /* Live capture is always going to be realtime */
546                        return -1;
547        }
548        return -1;
549}
550
551/* Starts a DAG output trace */
552static int dag_start_output(libtrace_out_t *libtrace) {
553        struct timeval zero, nopoll;
554
555        zero.tv_sec = 0;
556        zero.tv_usec = 0;
557        nopoll = zero;
558
559        /* Attach and start the DAG stream */
560
561        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
562                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
563                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
564                return -1;
565        }
566
567        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
568                        FORMAT_DATA_OUT->dagstream) < 0) {
569                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
570                return -1;
571        }
572        FORMAT_DATA_OUT->stream_attached = 1;
573
574        /* We don't want the dag card to do any sleeping */
575
576        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
577                        FORMAT_DATA_OUT->dagstream, 0, &zero,
578                        &nopoll);
579
580        return 0;
581}
582
583/* Starts a DAG input trace */
584static int dag_start_input(libtrace_t *libtrace) {
585        struct timeval zero, nopoll;
586        uint8_t *top, *bottom, *starttop;
587        uint64_t diff = 0;
588        top = bottom = NULL;
589
590        zero.tv_sec = 0;
591        zero.tv_usec = 10000;
592        nopoll = zero;
593
594        /* Attach and start the DAG stream */
595        if (dag_attach_stream(FORMAT_DATA->device->fd,
596                                FORMAT_DATA->dagstream, 0, 0) < 0) {
597                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
598                return -1;
599        }
600
601        if (dag_start_stream(FORMAT_DATA->device->fd,
602                                FORMAT_DATA->dagstream) < 0) {
603                trace_set_err(libtrace, errno, "Cannot start DAG stream");
604                return -1;
605        }
606        FORMAT_DATA->stream_attached = 1;
607       
608        /* We don't want the dag card to do any sleeping */
609        dag_set_stream_poll(FORMAT_DATA->device->fd,
610                                FORMAT_DATA->dagstream, 0, &zero,
611                                &nopoll);
612
613        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
614                                        FORMAT_DATA->dagstream,
615                                        &bottom);
616
617        /* Should probably flush the memory hole now */
618        while (starttop - bottom > 0) {
619                bottom += (starttop - bottom);
620                top = dag_advance_stream(FORMAT_DATA->device->fd,
621                                        FORMAT_DATA->dagstream,
622                                        &bottom);
623        }
624        FORMAT_DATA->top = top;
625        FORMAT_DATA->bottom = bottom;
626        FORMAT_DATA->processed = 0;
627        FORMAT_DATA->drops = 0;
628
629        return 0;
630}
631
632/* Pauses a DAG output trace */
633static int dag_pause_output(libtrace_out_t *libtrace) {
634
635        /* Stop and detach the stream */
636        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
637                        FORMAT_DATA_OUT->dagstream) < 0) {
638                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
639                return -1;
640        }
641        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
642                        FORMAT_DATA_OUT->dagstream) < 0) {
643                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
644                return -1;
645        }
646        FORMAT_DATA_OUT->stream_attached = 0;
647        return 0;
648}
649
650/* Pauses a DAG input trace */
651static int dag_pause_input(libtrace_t *libtrace) {
652
653        /* Stop and detach the stream */
654        if (dag_stop_stream(FORMAT_DATA->device->fd,
655                                FORMAT_DATA->dagstream) < 0) {
656                trace_set_err(libtrace, errno, "Could not stop DAG stream");
657                return -1;
658        }
659        if (dag_detach_stream(FORMAT_DATA->device->fd,
660                                FORMAT_DATA->dagstream) < 0) {
661                trace_set_err(libtrace, errno, "Could not detach DAG stream");
662                return -1;
663        }
664        FORMAT_DATA->stream_attached = 0;
665        return 0;
666}
667
668/* Closes a DAG input trace */
669static int dag_fin_input(libtrace_t *libtrace) {
670        /* Need the lock, since we're going to be handling the device list */
671        pthread_mutex_lock(&open_dag_mutex);
672       
673        /* Detach the stream if we are not paused */
674        if (FORMAT_DATA->stream_attached)
675                dag_pause_input(libtrace);
676        FORMAT_DATA->device->ref_count --;
677
678        /* Close the DAG device if there are no more references to it */
679        if (FORMAT_DATA->device->ref_count == 0)
680                dag_close_device(FORMAT_DATA->device);
681        if (DUCK.dummy_duck)
682                trace_destroy_dead(DUCK.dummy_duck);
683        free(libtrace->format_data);
684        pthread_mutex_unlock(&open_dag_mutex);
685        return 0; /* success */
686}
687
688/* Closes a DAG output trace */
689static int dag_fin_output(libtrace_out_t *libtrace) {
690       
691        /* Commit any outstanding traffic in the txbuffer */
692        if (FORMAT_DATA_OUT->waiting) {
693                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
694                                FORMAT_DATA_OUT->waiting );
695        }
696
697        /* Wait until the buffer is nearly clear before exiting the program,
698         * as we will lose packets otherwise */
699        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
700                        FORMAT_DATA_OUT->dagstream,
701                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
702                                        FORMAT_DATA_OUT->dagstream) - 8
703                        );
704
705        /* Need the lock, since we're going to be handling the device list */
706        pthread_mutex_lock(&open_dag_mutex);
707
708        /* Detach the stream if we are not paused */
709        if (FORMAT_DATA_OUT->stream_attached)
710                dag_pause_output(libtrace);
711        FORMAT_DATA_OUT->device->ref_count --;
712
713        /* Close the DAG device if there are no more references to it */
714        if (FORMAT_DATA_OUT->device->ref_count == 0)
715                dag_close_device(FORMAT_DATA_OUT->device);
716        free(libtrace->format_data);
717        pthread_mutex_unlock(&open_dag_mutex);
718        return 0; /* success */
719}
720
721/* Extracts DUCK information from the DAG card and produces a DUCK packet */
722static int dag_get_duckinfo(libtrace_t *libtrace,
723                                libtrace_packet_t *packet) {
724
725        /* Allocate memory for the DUCK data */
726        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
727                        !packet->buffer) {
728                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
729                packet->buf_control = TRACE_CTRL_PACKET;
730                if (!packet->buffer) {
731                        trace_set_err(libtrace, errno,
732                                        "Cannot allocate packet buffer");
733                        return -1;
734                }
735        }
736
737        /* DUCK doesn't have a format header */
738        packet->header = 0;
739        packet->payload = packet->buffer;
740
741        /* No need to check if we can get DUCK or not - we're modern
742         * enough so just grab the DUCK info */
743        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
744                                        (duckinf_t *)packet->payload) < 0)) {
745                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
746                return -1;
747        }
748
749        packet->type = TRACE_RT_DUCK_2_5;
750
751        /* Set the packet's tracce to point at a DUCK trace, so that the
752         * DUCK format functions will be called on the packet rather than the
753         * DAG ones */
754        if (!DUCK.dummy_duck)
755                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
756        packet->trace = DUCK.dummy_duck;
757        return sizeof(duckinf_t);
758}
759
760/* Determines the amount of data available to read from the DAG card */
761static int dag_available(libtrace_t *libtrace) {
762        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
763
764        /* If we've processed more than 4MB of data since we last called
765         * dag_advance_stream, then we should call it again to allow the
766         * space occupied by that 4MB to be released */
767        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
768                return diff;
769       
770        /* Update the top and bottom pointers */
771        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
772                        FORMAT_DATA->dagstream,
773                        &(FORMAT_DATA->bottom));
774       
775        if (FORMAT_DATA->top == NULL) {
776                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
777                return -1;
778        }
779        FORMAT_DATA->processed = 0;
780        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
781        return diff;
782}
783
784/* Returns a pointer to the start of the next complete ERF record */
785static dag_record_t *dag_get_record(libtrace_t *libtrace) {
786        dag_record_t *erfptr = NULL;
787        uint16_t size;
788        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
789        if (!erfptr)
790                return NULL;
791        size = ntohs(erfptr->rlen);
792        assert( size >= dag_record_size );
793        /* Make certain we have the full packet available */
794        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
795                return NULL;
796        FORMAT_DATA->bottom += size;
797        FORMAT_DATA->processed += size;
798        return erfptr;
799}
800
801/* Converts a buffer containing a recently read DAG packet record into a
802 * libtrace packet */
803static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
804                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
805
806        dag_record_t *erfptr;
807       
808        /* If the packet previously owned a buffer that is not the buffer
809         * that contains the new packet data, we're going to need to free the
810         * old one to avoid memory leaks */
811        if (packet->buffer != buffer &&
812                        packet->buf_control == TRACE_CTRL_PACKET) {
813                free(packet->buffer);
814        }
815
816        /* Set the buffer owner appropriately */
817        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
818                packet->buf_control = TRACE_CTRL_PACKET;
819        } else
820                packet->buf_control = TRACE_CTRL_EXTERNAL;
821
822        /* Update the packet pointers and type appropriately */
823        erfptr = (dag_record_t *)buffer;
824        packet->buffer = erfptr;
825        packet->header = erfptr;
826        packet->type = rt_type;
827
828        if (erfptr->flags.rxerror == 1) {
829                /* rxerror means the payload is corrupt - drop the payload
830                 * by tweaking rlen */
831                packet->payload = NULL;
832                erfptr->rlen = htons(erf_get_framing_length(packet));
833        } else {
834                packet->payload = (char*)packet->buffer
835                        + erf_get_framing_length(packet);
836        }
837
838        if (libtrace->format_data == NULL) {
839                dag_init_format_data(libtrace);
840        }
841
842        /* Update the dropped packets counter */
843
844        /* No loss counter for DSM coloured records - have to use
845         * some other API */
846        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
847                /* TODO */
848        } else {
849                /* Use the ERF loss counter */
850                if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
851                        FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
852                } else {
853                        FORMAT_DATA->drops += ntohs(erfptr->lctr);
854                }
855        }
856
857        return 0;
858}
859
860/*
861 * dag_write_packet() at this stage attempts to improve tx performance
862 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
863 * has been met. I observed approximately 270% performance increase
864 * through this relatively naive tweak. No optimisation of buffer sizes
865 * was attempted.
866 */
867
868/* Pushes an ERF record onto the transmit stream */
869static int dag_dump_packet(libtrace_out_t *libtrace,
870                dag_record_t *erfptr, unsigned int pad, void *buffer) {
871        int size;
872
873        /*
874         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
875         * requested any space yet, and request some, storing the pointer at
876         * FORMAT_DATA_OUT->txbuffer.
877         *
878         * The amount to request is slightly magical at the moment - it's
879         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
880         * the buffer and handle overruns.
881         */
882        if (FORMAT_DATA_OUT->waiting == 0) {
883                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
884                                FORMAT_DATA_OUT->dagstream, 16908288);
885        }
886
887        /*
888         * Copy the header separately to the body, as we can't guarantee they
889         * are in contiguous memory
890         */
891        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
892        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
893
894
895
896        /*
897         * Copy our incoming packet into the outgoing buffer, and increment
898         * our waiting count
899         */
900        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
901        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
902        FORMAT_DATA_OUT->waiting += size;
903
904        /*
905         * If our output buffer has more than 16 Mebibytes in it, commit those
906         * bytes and reset the waiting count to 0.
907         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
908         * case there is still data in the buffer at program exit.
909         */
910
911        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
912                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
913                        FORMAT_DATA_OUT->waiting );
914                FORMAT_DATA_OUT->waiting = 0;
915        }
916
917        return size + pad + dag_record_size;
918
919}
920
921/* Attempts to determine a suitable ERF type for a given packet. Returns true
922 * if one is found, false otherwise */
923static bool find_compatible_linktype(libtrace_out_t *libtrace,
924                                libtrace_packet_t *packet, char *type)
925{
926         // Keep trying to simplify the packet until we can find
927         //something we can do with it
928
929        do {
930                *type=libtrace_to_erf_type(trace_get_link_type(packet));
931
932                // Success
933                if (*type != (char)-1)
934                        return true;
935
936                if (!demote_packet(packet)) {
937                        trace_set_err_out(libtrace,
938                                        TRACE_ERR_NO_CONVERSION,
939                                        "No erf type for packet (%i)",
940                                        trace_get_link_type(packet));
941                        return false;
942                }
943
944        } while(1);
945
946        return true;
947}
948
949/* Writes a packet to the provided DAG output trace */
950static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
951        /*
952         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
953         * sucks, sorry about that.
954         */
955        unsigned int pad = 0;
956        int numbytes;
957        void *payload = packet->payload;
958        dag_record_t *header = (dag_record_t *)packet->header;
959        char erf_type = 0;
960
961        if(!packet->header) {
962                /* No header, probably an RT packet. Lifted from
963                 * erf_write_packet(). */
964                return -1;
965        }
966
967        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
968                return 0;
969
970        pad = dag_get_padding(packet);
971
972        /*
973         * If the payload is null, adjust the rlen. Discussion of this is
974         * attached to erf_write_packet()
975         */
976        if (payload == NULL) {
977                header->rlen = htons(dag_record_size + pad);
978        }
979
980        if (packet->type == TRACE_RT_DATA_ERF) {
981                numbytes = dag_dump_packet(libtrace,
982                                header,
983                                pad,
984                                payload
985                                );
986
987        } else {
988                /* Build up a new packet header from the existing header */
989
990                /* Simplify the packet first - if we can't do this, break
991                 * early */
992                if (!find_compatible_linktype(libtrace,packet,&erf_type))
993                        return -1;
994
995                dag_record_t erfhdr;
996
997                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
998                payload=packet->payload;
999                pad = dag_get_padding(packet);
1000
1001                /* Flags. Can't do this */
1002                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1003                if (trace_get_direction(packet)!=(int)~0U)
1004                        erfhdr.flags.iface = trace_get_direction(packet);
1005
1006                erfhdr.type = erf_type;
1007
1008                /* Packet length (rlen includes format overhead) */
1009                assert(trace_get_capture_length(packet)>0
1010                                && trace_get_capture_length(packet)<=65536);
1011                assert(erf_get_framing_length(packet)>0
1012                                && trace_get_framing_length(packet)<=65536);
1013                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1014                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1015
1016                erfhdr.rlen = htons(trace_get_capture_length(packet)
1017                        + erf_get_framing_length(packet));
1018
1019
1020                /* Loss counter. Can't do this */
1021                erfhdr.lctr = 0;
1022                /* Wire length, does not include padding! */
1023                erfhdr.wlen = htons(trace_get_wire_length(packet));
1024
1025                /* Write it out */
1026                numbytes = dag_dump_packet(libtrace,
1027                                &erfhdr,
1028                                pad,
1029                                payload);
1030
1031        }
1032
1033        return numbytes;
1034}
1035
1036/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1037 *
1038 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1039 */
1040static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1041        int size = 0;
1042        struct timeval tv;
1043        dag_record_t *erfptr = NULL;
1044        int numbytes = 0;
1045        uint32_t flags = 0;
1046        struct timeval maxwait;
1047        struct timeval pollwait;
1048
1049        pollwait.tv_sec = 0;
1050        pollwait.tv_usec = 10000;
1051        maxwait.tv_sec = 0;
1052        maxwait.tv_usec = 250000;
1053
1054        /* Check if we're due for a DUCK report */
1055        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
1056                        DUCK.duck_freq != 0) {
1057                size = dag_get_duckinfo(libtrace, packet);
1058                DUCK.last_duck = DUCK.last_pkt;
1059                if (size != 0) {
1060                        return size;
1061                }
1062                /* No DUCK support, so don't waste our time anymore */
1063                DUCK.duck_freq = 0;
1064        }
1065
1066        /* Don't let anyone try to free our DAG memory hole! */
1067        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1068
1069        /* If the packet buffer is currently owned by libtrace, free it so
1070         * that we can set the packet to point into the DAG memory hole */
1071        if (packet->buf_control == TRACE_CTRL_PACKET) {
1072                free(packet->buffer);
1073                packet->buffer = 0;
1074        }
1075       
1076        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1077                        FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 
1078                        &pollwait) == -1)
1079        {
1080                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1081                return -1;
1082        }
1083
1084
1085        /* Grab a full ERF record */
1086        do {
1087                numbytes = dag_available(libtrace);
1088                if (numbytes < 0)
1089                        return numbytes;
1090                if (numbytes < dag_record_size) {
1091                        if (libtrace_halt)
1092                                return 0;
1093                        /* Block until we see a packet */
1094                        continue;
1095                }
1096                erfptr = dag_get_record(libtrace);
1097        } while (erfptr == NULL);
1098
1099        /* Prepare the libtrace packet */
1100        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1101                                flags))
1102                return -1;
1103
1104        /* Update the DUCK timer */
1105        tv = trace_get_timeval(packet);
1106        DUCK.last_pkt = tv.tv_sec;
1107
1108        return packet->payload ? htons(erfptr->rlen) :
1109                                erf_get_framing_length(packet);
1110}
1111
1112/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1113 * packet is available, we will return a packet event. Otherwise we will
1114 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1115 */
1116static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1117                                        libtrace_packet_t *packet) {
1118        libtrace_eventobj_t event = {0,0,0.0,0};
1119        dag_record_t *erfptr = NULL;
1120        int numbytes;
1121        uint32_t flags = 0;
1122        struct timeval minwait;
1123       
1124        minwait.tv_sec = 0;
1125        minwait.tv_usec = 10000;
1126       
1127        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1128                        FORMAT_DATA->dagstream, 0, &minwait, 
1129                        &minwait) == -1)
1130        {
1131                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1132                event.type = TRACE_EVENT_TERMINATE;
1133                return event;
1134        }
1135
1136        do {
1137                erfptr = NULL;
1138                numbytes = 0;
1139       
1140                /* Need to call dag_available so that the top pointer will get
1141                 * updated, otherwise we'll never see any data! */
1142                numbytes = dag_available(libtrace);
1143
1144                /* May as well not bother calling dag_get_record if
1145                 * dag_available suggests that there's no data */
1146                if (numbytes != 0)
1147                        erfptr = dag_get_record(libtrace);
1148                if (erfptr == NULL) {
1149                        /* No packet available - sleep for a very short time */
1150                        if (libtrace_halt) {
1151                                event.type = TRACE_EVENT_TERMINATE;
1152                        } else {                       
1153                                event.type = TRACE_EVENT_SLEEP;
1154                                event.seconds = 0.0001;
1155                        }
1156                        break;
1157                }
1158                if (dag_prepare_packet(libtrace, packet, erfptr, 
1159                                        TRACE_RT_DATA_ERF, flags)) {
1160                        event.type = TRACE_EVENT_TERMINATE;
1161                        break;
1162                }
1163
1164
1165                event.size = trace_get_capture_length(packet) + 
1166                                trace_get_framing_length(packet);
1167               
1168                /* XXX trace_read_packet() normally applies the following
1169                 * config options for us, but this function is called via
1170                 * trace_event() so we have to do it ourselves */
1171
1172                if (libtrace->filter) {
1173                        int filtret = trace_apply_filter(libtrace->filter, 
1174                                        packet);
1175                        if (filtret == -1) {
1176                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1177                                                "Bad BPF Filter");
1178                                event.type = TRACE_EVENT_TERMINATE;
1179                                break;
1180                        }
1181
1182                        if (filtret == 0) {
1183                                /* This packet isn't useful so we want to
1184                                 * immediately see if there is another suitable
1185                                 * one - we definitely DO NOT want to return
1186                                 * a sleep event in this case, like we used to
1187                                 * do! */
1188                                libtrace->filtered_packets ++;
1189                                trace_clear_cache(packet);
1190                                continue;
1191                        }
1192                               
1193                        event.type = TRACE_EVENT_PACKET;
1194                } else {
1195                        event.type = TRACE_EVENT_PACKET;
1196                }
1197
1198                if (libtrace->snaplen > 0) {
1199                        trace_set_capture_length(packet, libtrace->snaplen);
1200                }
1201                libtrace->accepted_packets ++;
1202                break;
1203        } while (1);
1204
1205        return event;
1206}
1207
1208/* Gets the number of dropped packets */
1209static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1210        if (trace->format_data == NULL)
1211                return (uint64_t)-1;
1212        return DATA(trace)->drops;
1213}
1214
1215/* Prints some semi-useful help text about the DAG format module */
1216static void dag_help(void) {
1217        printf("dag format module: $Revision: 1755 $\n");
1218        printf("Supported input URIs:\n");
1219        printf("\tdag:/dev/dagn\n");
1220        printf("\n");
1221        printf("\te.g.: dag:/dev/dag0\n");
1222        printf("\n");
1223        printf("Supported output URIs:\n");
1224        printf("\tnone\n");
1225        printf("\n");
1226}
1227
1228static struct libtrace_format_t dag = {
1229        "dag",
1230        "$Id$",
1231        TRACE_FORMAT_ERF,
1232        dag_probe_filename,             /* probe filename */
1233        NULL,                           /* probe magic */
1234        dag_init_input,                 /* init_input */
1235        dag_config_input,               /* config_input */
1236        dag_start_input,                /* start_input */
1237        dag_pause_input,                /* pause_input */
1238        dag_init_output,                /* init_output */
1239        NULL,                           /* config_output */
1240        dag_start_output,               /* start_output */
1241        dag_fin_input,                  /* fin_input */
1242        dag_fin_output,                 /* fin_output */
1243        dag_read_packet,                /* read_packet */
1244        dag_prepare_packet,             /* prepare_packet */
1245        NULL,                           /* fin_packet */
1246        dag_write_packet,               /* write_packet */
1247        erf_get_link_type,              /* get_link_type */
1248        erf_get_direction,              /* get_direction */
1249        erf_set_direction,              /* set_direction */
1250        erf_get_erf_timestamp,          /* get_erf_timestamp */
1251        NULL,                           /* get_timeval */
1252        NULL,                           /* get_seconds */
1253        NULL,                           /* get_timespec */
1254        NULL,                           /* seek_erf */
1255        NULL,                           /* seek_timeval */
1256        NULL,                           /* seek_seconds */
1257        erf_get_capture_length,         /* get_capture_length */
1258        erf_get_wire_length,            /* get_wire_length */
1259        erf_get_framing_length,         /* get_framing_length */
1260        erf_set_capture_length,         /* set_capture_length */
1261        NULL,                           /* get_received_packets */
1262        NULL,                           /* get_filtered_packets */
1263        dag_get_dropped_packets,        /* get_dropped_packets */
1264        NULL,                           /* get_captured_packets */
1265        NULL,                           /* get_fd */
1266        trace_event_dag,                /* trace_event */
1267        dag_help,                       /* help */
1268        NULL                            /* next pointer */
1269};
1270
1271void dag_constructor(void) {
1272        register_format(&dag);
1273}
Note: See TracBrowser for help on using the repository browser.