source: lib/format_dag25.c @ 4bd6393

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 4bd6393 was 4bd6393, checked in by Shane Alcock <salcock@…>, 7 years ago

Attempt at fixing DAG device name memory leak

I've actually made the format data store the device name, since it
sounds like Dan will need this for his additional configuration.

Haven't tested properly yet, as I'll need to build on a machine with
DAG libraries.

  • Property mode set to 100644
File size: 38.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* String containing the DAG device name */
107        char *device_name;
108        /* The DAG device being used for writing */
109        struct dag_dev_t *device;
110        /* The DAG stream that is being written on */
111        unsigned int dagstream;
112        /* Boolean flag indicating whether the stream is currently attached */
113        int stream_attached;
114        /* The amount of data waiting to be transmitted, in bytes */
115        uint64_t waiting;
116        /* A buffer to hold the data to be transmittted */
117        uint8_t *txbuffer;
118};
119
120/* "Global" data that is stored for each DAG input trace */
121struct dag_format_data_t {
122
123        /* Data required for regular DUCK reporting */
124        struct {
125                /* Timestamp of the last DUCK report */
126                uint32_t last_duck;
127                /* The number of seconds between each DUCK report */
128                uint32_t duck_freq;
129                /* Timestamp of the last packet read from the DAG card */
130                uint32_t last_pkt;
131                /* Dummy trace to ensure DUCK packets are dealt with using the
132                 * DUCK format functions */
133                libtrace_t *dummy_duck;
134        } duck;
135
136        /* String containing the DAG device name */
137        char *device_name;
138        /* The DAG device that we are reading from */
139        struct dag_dev_t *device;
140        /* The DAG stream that we are reading from */
141        unsigned int dagstream;
142        /* Boolean flag indicating whether the stream is currently attached */
143        int stream_attached;
144        /* Pointer to the first unread byte in the DAG memory hole */
145        uint8_t *bottom;
146        /* Pointer to the last unread byte in the DAG memory hole */
147        uint8_t *top;
148        /* The amount of data processed thus far from the bottom pointer */
149        uint32_t processed;
150        /* The number of packets that have been dropped */
151        uint64_t drops;
152
153        uint8_t seeninterface[4];
154};
155
156/* To be thread-safe, we're going to need a mutex for operating on the list
157 * of DAG devices */
158pthread_mutex_t open_dag_mutex;
159
160/* The list of DAG devices that have been opened by libtrace.
161 *
162 * We can only open each DAG device once, but we might want to read from
163 * multiple streams. Therefore, we need to maintain a list of devices that we
164 * have opened (with ref counts!) so that we don't try to open a device too
165 * many times or close a device that we're still using */
166struct dag_dev_t *open_dags = NULL;
167
168/* Returns the amount of padding between the ERF header and the start of the
169 * captured packet data */
170static int dag_get_padding(const libtrace_packet_t *packet)
171{
172        /* ERF Ethernet records have a 2 byte padding before the packet itself
173         * so that the IP header is aligned on a 32 bit boundary.
174         */
175        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
176                dag_record_t *erfptr = (dag_record_t *)packet->header;
177                switch(erfptr->type) {
178                        case TYPE_ETH:
179                        case TYPE_DSM_COLOR_ETH:
180                                return 2;
181                        default:                return 0;
182                }
183        }
184        else {
185                switch(trace_get_link_type(packet)) {
186                        case TRACE_TYPE_ETH:    return 2;
187                        default:                return 0;
188                }
189        }
190}
191
192/* Attempts to determine if the given filename refers to a DAG device */
193static int dag_probe_filename(const char *filename)
194{
195        struct stat statbuf;
196        /* Can we stat the file? */
197        if (stat(filename, &statbuf) != 0) {
198                return 0;
199        }
200        /* Is it a character device? */
201        if (!S_ISCHR(statbuf.st_mode)) {
202                return 0;
203        }
204        /* Yeah, it's probably us. */
205        return 1;
206}
207
208/* Initialises the DAG output data structure */
209static void dag_init_format_out_data(libtrace_out_t *libtrace) {
210        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
211        // no DUCK on output
212        FORMAT_DATA_OUT->stream_attached = 0;
213        FORMAT_DATA_OUT->device = NULL;
214        FORMAT_DATA_OUT->device_name = NULL;
215        FORMAT_DATA_OUT->dagstream = 0;
216        FORMAT_DATA_OUT->waiting = 0;
217
218}
219
220/* Initialises the DAG input data structure */
221static void dag_init_format_data(libtrace_t *libtrace) {
222        libtrace->format_data = (struct dag_format_data_t *)
223                malloc(sizeof(struct dag_format_data_t));
224        DUCK.last_duck = 0;
225        DUCK.duck_freq = 0;
226        DUCK.last_pkt = 0;
227        DUCK.dummy_duck = NULL;
228        FORMAT_DATA->stream_attached = 0;
229        FORMAT_DATA->drops = 0;
230        FORMAT_DATA->device_name = NULL;
231        FORMAT_DATA->device = NULL;
232        FORMAT_DATA->dagstream = 0;
233        FORMAT_DATA->processed = 0;
234        FORMAT_DATA->bottom = NULL;
235        FORMAT_DATA->top = NULL;
236        memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
237}
238
239/* Determines if there is already an entry for the given DAG device in the
240 * device list and increments the reference count for that device, if found.
241 *
242 * NOTE: This function assumes the open_dag_mutex is held by the caller */
243static struct dag_dev_t *dag_find_open_device(char *dev_name) {
244        struct dag_dev_t *dag_dev;
245
246        dag_dev = open_dags;
247
248        /* XXX: Not exactly zippy, but how often are we going to be dealing
249         * with multiple dag cards? */
250        while (dag_dev != NULL) {
251                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
252                        dag_dev->ref_count ++;
253                        return dag_dev;
254
255                }
256                dag_dev = dag_dev->next;
257        }
258        return NULL;
259
260
261}
262
263/* Closes a DAG device and removes it from the device list.
264 *
265 * Attempting to close a DAG device that has a non-zero reference count will
266 * cause an assertion failure!
267 *
268 * NOTE: This function assumes the open_dag_mutex is held by the caller */
269static void dag_close_device(struct dag_dev_t *dev) {
270        /* Need to remove from the device list */
271
272        assert(dev->ref_count == 0);
273
274        if (dev->prev == NULL) {
275                open_dags = dev->next;
276                if (dev->next)
277                        dev->next->prev = NULL;
278        } else {
279                dev->prev->next = dev->next;
280                if (dev->next)
281                        dev->next->prev = dev->prev;
282        }
283
284        dag_close(dev->fd);
285        if (dev->dev_name)
286                free(dev->dev_name);
287        free(dev);
288}
289
290
291/* Opens a new DAG device for writing and adds it to the DAG device list
292 *
293 * NOTE: this function should only be called when opening a DAG device for
294 * writing - there is little practical difference between this and the
295 * function below that covers the reading case, but we need the output trace
296 * object to report errors properly so the two functions take slightly
297 * different arguments. This is really lame and there should be a much better
298 * way of doing this.
299 *
300 * NOTE: This function assumes the open_dag_mutex is held by the caller
301 */
302static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
303        struct stat buf;
304        int fd;
305        struct dag_dev_t *new_dev;
306
307        /* Make sure the device exists */
308        if (stat(dev_name, &buf) == -1) {
309                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
310                return NULL;
311}
312
313        /* Make sure it is the appropriate type of device */
314        if (S_ISCHR(buf.st_mode)) {
315                /* Try opening the DAG device */
316                if((fd = dag_open(dev_name)) < 0) {
317                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
318                                        dev_name);
319                        return NULL;
320                }
321        } else {
322                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
323                                dev_name);
324                return NULL;
325        }
326
327        /* Add the device to our device list - it is just a doubly linked
328         * list with no inherent ordering; just tack the new one on the front
329         */
330        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
331        new_dev->fd = fd;
332        new_dev->dev_name = dev_name;
333        new_dev->ref_count = 1;
334
335        new_dev->prev = NULL;
336        new_dev->next = open_dags;
337        if (open_dags)
338                open_dags->prev = new_dev;
339
340        open_dags = new_dev;
341
342        return new_dev;
343}
344
345/* Opens a new DAG device for reading and adds it to the DAG device list
346 *
347 * NOTE: this function should only be called when opening a DAG device for
348 * reading - there is little practical difference between this and the
349 * function above that covers the writing case, but we need the input trace
350 * object to report errors properly so the two functions take slightly
351 * different arguments. This is really lame and there should be a much better
352 * way of doing this.
353 *
354 * NOTE: This function assumes the open_dag_mutex is held by the caller */
355static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
356        struct stat buf;
357        int fd;
358        struct dag_dev_t *new_dev;
359
360        /* Make sure the device exists */
361        if (stat(dev_name, &buf) == -1) {
362                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
363                return NULL;
364        }
365
366        /* Make sure it is the appropriate type of device */
367        if (S_ISCHR(buf.st_mode)) {
368                /* Try opening the DAG device */
369                if((fd = dag_open(dev_name)) < 0) {
370                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
371                                        dev_name);
372                        return NULL;
373                }
374        } else {
375                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
376                                dev_name);
377                return NULL;
378        }
379
380        /* Add the device to our device list - it is just a doubly linked
381         * list with no inherent ordering; just tack the new one on the front
382         */
383        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
384        new_dev->fd = fd;
385        new_dev->dev_name = dev_name;
386        new_dev->ref_count = 1;
387
388        new_dev->prev = NULL;
389        new_dev->next = open_dags;
390        if (open_dags)
391                open_dags->prev = new_dev;
392
393        open_dags = new_dev;
394
395        return new_dev;
396}
397
398/* Creates and initialises a DAG output trace */
399static int dag_init_output(libtrace_out_t *libtrace) {
400        char *scan = NULL;
401        struct dag_dev_t *dag_device = NULL;
402        int stream = 1;
403       
404        /* XXX I don't know if this is important or not, but this function
405         * isn't present in all of the driver releases that this code is
406         * supposed to support! */
407        /*
408        unsigned long wake_time;
409        dagutil_sleep_get_wake_time(&wake_time,0);
410        */
411
412        dag_init_format_out_data(libtrace);
413        /* Grab the mutex while we're likely to be messing with the device
414         * list */
415        pthread_mutex_lock(&open_dag_mutex);
416       
417        /* Specific streams are signified using a comma in the libtrace URI,
418         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
419         *
420         * If no stream is specified, we will write using stream 1 */
421        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
422                FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);
423        } else {
424                FORMAT_DATA_OUT->device_name = 
425                                (char *)strndup(libtrace->uridata,
426                                (size_t)(scan - libtrace->uridata));
427                stream = atoi(++scan);
428        }
429        FORMAT_DATA_OUT->dagstream = stream;
430
431        /* See if our DAG device is already open */
432        dag_device = dag_find_open_device(FORMAT_DATA_OUT->device_name);
433
434        if (dag_device == NULL) {
435                /* Device not yet opened - open it ourselves */
436                dag_device = dag_open_output_device(libtrace, 
437                                FORMAT_DATA_OUT->device_name);
438        }
439
440        /* Make sure we have successfully opened a DAG device */
441        if (dag_device == NULL) {
442                if (FORMAT_DATA_OUT->device_name) {
443                        free(FORMAT_DATA_OUT->device_name);
444                        FORMAT_DATA_OUT->device_name = NULL;
445                }
446                pthread_mutex_unlock(&open_dag_mutex);
447                return -1;
448        }
449
450        FORMAT_DATA_OUT->device = dag_device;
451        pthread_mutex_unlock(&open_dag_mutex);
452        return 0;
453}
454
455/* Creates and initialises a DAG input trace */
456static int dag_init_input(libtrace_t *libtrace) {
457        char *scan = NULL;
458        int stream = 0;
459        struct dag_dev_t *dag_device = NULL;
460
461        dag_init_format_data(libtrace);
462        /* Grab the mutex while we're likely to be messing with the device
463         * list */
464        pthread_mutex_lock(&open_dag_mutex);
465       
466       
467        /* Specific streams are signified using a comma in the libtrace URI,
468         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
469         *
470         * If no stream is specified, we will read from stream 0 */
471        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
472                FORMAT_DATA->device_name = strdup(libtrace->uridata);
473        } else {
474                FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,
475                                (size_t)(scan - libtrace->uridata));
476                stream = atoi(++scan);
477        }
478
479        FORMAT_DATA->dagstream = stream;
480
481        /* See if our DAG device is already open */
482        dag_device = dag_find_open_device(FORMAT_DATA->device_name);
483
484        if (dag_device == NULL) {
485                /* Device not yet opened - open it ourselves */
486                dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name);
487        }
488
489        /* Make sure we have successfully opened a DAG device */
490        if (dag_device == NULL) {
491                if (FORMAT_DATA->device_name)
492                        free(FORMAT_DATA->device_name);
493                FORMAT_DATA->device_name = NULL;
494                pthread_mutex_unlock(&open_dag_mutex);
495                return -1;
496        }
497
498        FORMAT_DATA->device = dag_device;
499
500        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
501        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
502 
503        *  The symptom of the port being disabled is that libtrace will appear to hang.
504        */
505        /* Check kBooleanAttributeFault is false */
506        /* Check kBooleanAttributeLocalFault is false */
507        /* Check kBooleanAttributeLock is true ? */
508        /* Check kBooleanAttributePeerLink ? */
509
510        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
511        /* Set kUint32AttributeSnapLength to the snaplength */
512
513        pthread_mutex_unlock(&open_dag_mutex);
514        return 0;
515}
516
517/* Configures a DAG input trace */
518static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
519                                void *data) {
520        char conf_str[4096];
521        switch(option) {
522                case TRACE_OPTION_META_FREQ:
523                        /* This option is used to specify the frequency of DUCK
524                         * updates */
525                        DUCK.duck_freq = *(int *)data;
526                        return 0;
527                case TRACE_OPTION_SNAPLEN:
528                        /* Tell the card our new snap length */
529                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
530                        if (dag_configure(FORMAT_DATA->device->fd,
531                                                conf_str) != 0) {
532                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
533                                return -1;
534                        }
535                        return 0;
536                case TRACE_OPTION_PROMISC:
537                        /* DAG already operates in a promisc fashion */
538                        return -1;
539                case TRACE_OPTION_FILTER:
540                        /* We don't yet support pushing filters into DAG
541                         * cards */
542                        return -1;
543                case TRACE_OPTION_EVENT_REALTIME:
544                        /* Live capture is always going to be realtime */
545                        return -1;
546        }
547        return -1;
548}
549
550/* Starts a DAG output trace */
551static int dag_start_output(libtrace_out_t *libtrace) {
552        struct timeval zero, nopoll;
553
554        zero.tv_sec = 0;
555        zero.tv_usec = 0;
556        nopoll = zero;
557
558        /* Attach and start the DAG stream */
559
560        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
561                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
562                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
563                return -1;
564        }
565
566        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
567                        FORMAT_DATA_OUT->dagstream) < 0) {
568                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
569                return -1;
570        }
571        FORMAT_DATA_OUT->stream_attached = 1;
572
573        /* We don't want the dag card to do any sleeping */
574
575        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
576                        FORMAT_DATA_OUT->dagstream, 0, &zero,
577                        &nopoll);
578
579        return 0;
580}
581
582/* Starts a DAG input trace */
583static int dag_start_input(libtrace_t *libtrace) {
584        struct timeval zero, nopoll;
585        uint8_t *top, *bottom, *starttop;
586        uint64_t diff = 0;
587        top = bottom = NULL;
588
589        zero.tv_sec = 0;
590        zero.tv_usec = 10000;
591        nopoll = zero;
592
593        /* Attach and start the DAG stream */
594        if (dag_attach_stream(FORMAT_DATA->device->fd,
595                                FORMAT_DATA->dagstream, 0, 0) < 0) {
596                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
597                return -1;
598        }
599
600        if (dag_start_stream(FORMAT_DATA->device->fd,
601                                FORMAT_DATA->dagstream) < 0) {
602                trace_set_err(libtrace, errno, "Cannot start DAG stream");
603                return -1;
604        }
605        FORMAT_DATA->stream_attached = 1;
606       
607        /* We don't want the dag card to do any sleeping */
608        dag_set_stream_poll(FORMAT_DATA->device->fd,
609                                FORMAT_DATA->dagstream, 0, &zero,
610                                &nopoll);
611
612        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
613                                        FORMAT_DATA->dagstream,
614                                        &bottom);
615
616        /* Should probably flush the memory hole now */
617        top = starttop;
618        while (starttop - bottom > 0) {
619                bottom += (starttop - bottom);
620                top = dag_advance_stream(FORMAT_DATA->device->fd,
621                                        FORMAT_DATA->dagstream,
622                                        &bottom);
623        }
624        FORMAT_DATA->top = top;
625        FORMAT_DATA->bottom = bottom;
626        FORMAT_DATA->processed = 0;
627        FORMAT_DATA->drops = 0;
628
629        return 0;
630}
631
632/* Pauses a DAG output trace */
633static int dag_pause_output(libtrace_out_t *libtrace) {
634
635        /* Stop and detach the stream */
636        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
637                        FORMAT_DATA_OUT->dagstream) < 0) {
638                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
639                return -1;
640        }
641        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
642                        FORMAT_DATA_OUT->dagstream) < 0) {
643                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
644                return -1;
645        }
646        FORMAT_DATA_OUT->stream_attached = 0;
647        return 0;
648}
649
650/* Pauses a DAG input trace */
651static int dag_pause_input(libtrace_t *libtrace) {
652
653        /* Stop and detach the stream */
654        if (dag_stop_stream(FORMAT_DATA->device->fd,
655                                FORMAT_DATA->dagstream) < 0) {
656                trace_set_err(libtrace, errno, "Could not stop DAG stream");
657                return -1;
658        }
659        if (dag_detach_stream(FORMAT_DATA->device->fd,
660                                FORMAT_DATA->dagstream) < 0) {
661                trace_set_err(libtrace, errno, "Could not detach DAG stream");
662                return -1;
663        }
664        FORMAT_DATA->stream_attached = 0;
665        return 0;
666}
667
668/* Closes a DAG input trace */
669static int dag_fin_input(libtrace_t *libtrace) {
670        /* Need the lock, since we're going to be handling the device list */
671        pthread_mutex_lock(&open_dag_mutex);
672       
673        /* Detach the stream if we are not paused */
674        if (FORMAT_DATA->stream_attached)
675                dag_pause_input(libtrace);
676        FORMAT_DATA->device->ref_count --;
677
678        /* Close the DAG device if there are no more references to it */
679        if (FORMAT_DATA->device->ref_count == 0)
680                dag_close_device(FORMAT_DATA->device);
681        if (DUCK.dummy_duck)
682                trace_destroy_dead(DUCK.dummy_duck);
683        if (FORMAT_DATA->device_name)
684                free(FORMAT_DATA->device_name);
685        free(libtrace->format_data);
686        pthread_mutex_unlock(&open_dag_mutex);
687        return 0; /* success */
688}
689
690/* Closes a DAG output trace */
691static int dag_fin_output(libtrace_out_t *libtrace) {
692       
693        /* Commit any outstanding traffic in the txbuffer */
694        if (FORMAT_DATA_OUT->waiting) {
695                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
696                                FORMAT_DATA_OUT->waiting );
697        }
698
699        /* Wait until the buffer is nearly clear before exiting the program,
700         * as we will lose packets otherwise */
701        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
702                        FORMAT_DATA_OUT->dagstream,
703                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
704                                        FORMAT_DATA_OUT->dagstream) - 8
705                        );
706
707        /* Need the lock, since we're going to be handling the device list */
708        pthread_mutex_lock(&open_dag_mutex);
709
710        /* Detach the stream if we are not paused */
711        if (FORMAT_DATA_OUT->stream_attached)
712                dag_pause_output(libtrace);
713        FORMAT_DATA_OUT->device->ref_count --;
714
715        /* Close the DAG device if there are no more references to it */
716        if (FORMAT_DATA_OUT->device->ref_count == 0)
717                dag_close_device(FORMAT_DATA_OUT->device);
718        if (FORMAT_DATA_OUT->device_name)
719                free(FORMAT_DATA_OUT->device_name);
720        free(libtrace->format_data);
721        pthread_mutex_unlock(&open_dag_mutex);
722        return 0; /* success */
723}
724
725/* Extracts DUCK information from the DAG card and produces a DUCK packet */
726static int dag_get_duckinfo(libtrace_t *libtrace,
727                                libtrace_packet_t *packet) {
728
729        /* Allocate memory for the DUCK data */
730        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
731                        !packet->buffer) {
732                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
733                packet->buf_control = TRACE_CTRL_PACKET;
734                if (!packet->buffer) {
735                        trace_set_err(libtrace, errno,
736                                        "Cannot allocate packet buffer");
737                        return -1;
738                }
739        }
740
741        /* DUCK doesn't have a format header */
742        packet->header = 0;
743        packet->payload = packet->buffer;
744
745        /* No need to check if we can get DUCK or not - we're modern
746         * enough so just grab the DUCK info */
747        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
748                                        (duckinf_t *)packet->payload) < 0)) {
749                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
750                return -1;
751        }
752
753        packet->type = TRACE_RT_DUCK_2_5;
754
755        /* Set the packet's tracce to point at a DUCK trace, so that the
756         * DUCK format functions will be called on the packet rather than the
757         * DAG ones */
758        if (!DUCK.dummy_duck)
759                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
760        packet->trace = DUCK.dummy_duck;
761        return sizeof(duckinf_t);
762}
763
764/* Determines the amount of data available to read from the DAG card */
765static int dag_available(libtrace_t *libtrace) {
766        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
767
768        /* If we've processed more than 4MB of data since we last called
769         * dag_advance_stream, then we should call it again to allow the
770         * space occupied by that 4MB to be released */
771        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
772                return diff;
773       
774        /* Update the top and bottom pointers */
775        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
776                        FORMAT_DATA->dagstream,
777                        &(FORMAT_DATA->bottom));
778       
779        if (FORMAT_DATA->top == NULL) {
780                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
781                return -1;
782        }
783        FORMAT_DATA->processed = 0;
784        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
785        return diff;
786}
787
788/* Returns a pointer to the start of the next complete ERF record */
789static dag_record_t *dag_get_record(libtrace_t *libtrace) {
790        dag_record_t *erfptr = NULL;
791        uint16_t size;
792        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
793        if (!erfptr)
794                return NULL;
795        size = ntohs(erfptr->rlen);
796        assert( size >= dag_record_size );
797        /* Make certain we have the full packet available */
798        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
799                return NULL;
800        FORMAT_DATA->bottom += size;
801        FORMAT_DATA->processed += size;
802        return erfptr;
803}
804
805/* Converts a buffer containing a recently read DAG packet record into a
806 * libtrace packet */
807static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
808                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
809
810        dag_record_t *erfptr;
811       
812        /* If the packet previously owned a buffer that is not the buffer
813         * that contains the new packet data, we're going to need to free the
814         * old one to avoid memory leaks */
815        if (packet->buffer != buffer &&
816                        packet->buf_control == TRACE_CTRL_PACKET) {
817                free(packet->buffer);
818        }
819
820        /* Set the buffer owner appropriately */
821        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
822                packet->buf_control = TRACE_CTRL_PACKET;
823        } else
824                packet->buf_control = TRACE_CTRL_EXTERNAL;
825
826        /* Update the packet pointers and type appropriately */
827        erfptr = (dag_record_t *)buffer;
828        packet->buffer = erfptr;
829        packet->header = erfptr;
830        packet->type = rt_type;
831
832        if (erfptr->flags.rxerror == 1) {
833                /* rxerror means the payload is corrupt - drop the payload
834                 * by tweaking rlen */
835                packet->payload = NULL;
836                erfptr->rlen = htons(erf_get_framing_length(packet));
837        } else {
838                packet->payload = (char*)packet->buffer
839                        + erf_get_framing_length(packet);
840        }
841
842        if (libtrace->format_data == NULL) {
843                dag_init_format_data(libtrace);
844        }
845
846        /* Update the dropped packets counter */
847
848        /* No loss counter for DSM coloured records - have to use
849         * some other API */
850        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
851                /* TODO */
852        } else {
853                /* Use the ERF loss counter */
854                if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
855                        FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
856                } else {
857                        FORMAT_DATA->drops += ntohs(erfptr->lctr);
858                }
859        }
860
861        return 0;
862}
863
864/*
865 * dag_write_packet() at this stage attempts to improve tx performance
866 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
867 * has been met. I observed approximately 270% performance increase
868 * through this relatively naive tweak. No optimisation of buffer sizes
869 * was attempted.
870 */
871
872/* Pushes an ERF record onto the transmit stream */
873static int dag_dump_packet(libtrace_out_t *libtrace,
874                dag_record_t *erfptr, unsigned int pad, void *buffer) {
875        int size;
876
877        /*
878         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
879         * requested any space yet, and request some, storing the pointer at
880         * FORMAT_DATA_OUT->txbuffer.
881         *
882         * The amount to request is slightly magical at the moment - it's
883         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
884         * the buffer and handle overruns.
885         */
886        if (FORMAT_DATA_OUT->waiting == 0) {
887                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
888                                FORMAT_DATA_OUT->dagstream, 16908288);
889        }
890
891        /*
892         * Copy the header separately to the body, as we can't guarantee they
893         * are in contiguous memory
894         */
895        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
896        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
897
898
899
900        /*
901         * Copy our incoming packet into the outgoing buffer, and increment
902         * our waiting count
903         */
904        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
905        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
906        FORMAT_DATA_OUT->waiting += size;
907
908        /*
909         * If our output buffer has more than 16 Mebibytes in it, commit those
910         * bytes and reset the waiting count to 0.
911         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
912         * case there is still data in the buffer at program exit.
913         */
914
915        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
916                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
917                        FORMAT_DATA_OUT->waiting );
918                FORMAT_DATA_OUT->waiting = 0;
919        }
920
921        return size + pad + dag_record_size;
922
923}
924
925/* Attempts to determine a suitable ERF type for a given packet. Returns true
926 * if one is found, false otherwise */
927static bool find_compatible_linktype(libtrace_out_t *libtrace,
928                                libtrace_packet_t *packet, char *type)
929{
930         // Keep trying to simplify the packet until we can find
931         //something we can do with it
932
933        do {
934                *type=libtrace_to_erf_type(trace_get_link_type(packet));
935
936                // Success
937                if (*type != (char)-1)
938                        return true;
939
940                if (!demote_packet(packet)) {
941                        trace_set_err_out(libtrace,
942                                        TRACE_ERR_NO_CONVERSION,
943                                        "No erf type for packet (%i)",
944                                        trace_get_link_type(packet));
945                        return false;
946                }
947
948        } while(1);
949
950        return true;
951}
952
953/* Writes a packet to the provided DAG output trace */
954static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
955        /*
956         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
957         * sucks, sorry about that.
958         */
959        unsigned int pad = 0;
960        int numbytes;
961        void *payload = packet->payload;
962        dag_record_t *header = (dag_record_t *)packet->header;
963        char erf_type = 0;
964
965        if(!packet->header) {
966                /* No header, probably an RT packet. Lifted from
967                 * erf_write_packet(). */
968                return -1;
969        }
970
971        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
972                return 0;
973
974        pad = dag_get_padding(packet);
975
976        /*
977         * If the payload is null, adjust the rlen. Discussion of this is
978         * attached to erf_write_packet()
979         */
980        if (payload == NULL) {
981                header->rlen = htons(dag_record_size + pad);
982        }
983
984        if (packet->type == TRACE_RT_DATA_ERF) {
985                numbytes = dag_dump_packet(libtrace,
986                                header,
987                                pad,
988                                payload
989                                );
990
991        } else {
992                /* Build up a new packet header from the existing header */
993
994                /* Simplify the packet first - if we can't do this, break
995                 * early */
996                if (!find_compatible_linktype(libtrace,packet,&erf_type))
997                        return -1;
998
999                dag_record_t erfhdr;
1000
1001                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1002                payload=packet->payload;
1003                pad = dag_get_padding(packet);
1004
1005                /* Flags. Can't do this */
1006                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1007                if (trace_get_direction(packet)!=(int)~0U)
1008                        erfhdr.flags.iface = trace_get_direction(packet);
1009
1010                erfhdr.type = erf_type;
1011
1012                /* Packet length (rlen includes format overhead) */
1013                assert(trace_get_capture_length(packet)>0
1014                                && trace_get_capture_length(packet)<=65536);
1015                assert(erf_get_framing_length(packet)>0
1016                                && trace_get_framing_length(packet)<=65536);
1017                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1018                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1019
1020                erfhdr.rlen = htons(trace_get_capture_length(packet)
1021                        + erf_get_framing_length(packet));
1022
1023
1024                /* Loss counter. Can't do this */
1025                erfhdr.lctr = 0;
1026                /* Wire length, does not include padding! */
1027                erfhdr.wlen = htons(trace_get_wire_length(packet));
1028
1029                /* Write it out */
1030                numbytes = dag_dump_packet(libtrace,
1031                                &erfhdr,
1032                                pad,
1033                                payload);
1034
1035        }
1036
1037        return numbytes;
1038}
1039
1040/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1041 *
1042 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1043 */
1044static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1045        int size = 0;
1046        struct timeval tv;
1047        dag_record_t *erfptr = NULL;
1048        int numbytes = 0;
1049        uint32_t flags = 0;
1050        struct timeval maxwait;
1051        struct timeval pollwait;
1052
1053        pollwait.tv_sec = 0;
1054        pollwait.tv_usec = 10000;
1055        maxwait.tv_sec = 0;
1056        maxwait.tv_usec = 250000;
1057
1058        /* Check if we're due for a DUCK report */
1059        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
1060                        DUCK.duck_freq != 0) {
1061                size = dag_get_duckinfo(libtrace, packet);
1062                DUCK.last_duck = DUCK.last_pkt;
1063                if (size != 0) {
1064                        return size;
1065                }
1066                /* No DUCK support, so don't waste our time anymore */
1067                DUCK.duck_freq = 0;
1068        }
1069
1070        /* Don't let anyone try to free our DAG memory hole! */
1071        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1072
1073        /* If the packet buffer is currently owned by libtrace, free it so
1074         * that we can set the packet to point into the DAG memory hole */
1075        if (packet->buf_control == TRACE_CTRL_PACKET) {
1076                free(packet->buffer);
1077                packet->buffer = 0;
1078        }
1079       
1080        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1081                        FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 
1082                        &pollwait) == -1)
1083        {
1084                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1085                return -1;
1086        }
1087
1088
1089        /* Grab a full ERF record */
1090        do {
1091                numbytes = dag_available(libtrace);
1092                if (numbytes < 0)
1093                        return numbytes;
1094                if (numbytes < dag_record_size) {
1095                        if (libtrace_halt)
1096                                return 0;
1097                        /* Block until we see a packet */
1098                        continue;
1099                }
1100                erfptr = dag_get_record(libtrace);
1101        } while (erfptr == NULL);
1102
1103        /* Prepare the libtrace packet */
1104        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1105                                flags))
1106                return -1;
1107
1108        /* Update the DUCK timer */
1109        tv = trace_get_timeval(packet);
1110        DUCK.last_pkt = tv.tv_sec;
1111
1112        return packet->payload ? htons(erfptr->rlen) :
1113                                erf_get_framing_length(packet);
1114}
1115
1116/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1117 * packet is available, we will return a packet event. Otherwise we will
1118 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1119 */
1120static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1121                                        libtrace_packet_t *packet) {
1122        libtrace_eventobj_t event = {0,0,0.0,0};
1123        dag_record_t *erfptr = NULL;
1124        int numbytes;
1125        uint32_t flags = 0;
1126        struct timeval minwait;
1127       
1128        minwait.tv_sec = 0;
1129        minwait.tv_usec = 10000;
1130       
1131        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1132                        FORMAT_DATA->dagstream, 0, &minwait, 
1133                        &minwait) == -1)
1134        {
1135                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1136                event.type = TRACE_EVENT_TERMINATE;
1137                return event;
1138        }
1139
1140        do {
1141                erfptr = NULL;
1142                numbytes = 0;
1143       
1144                /* Need to call dag_available so that the top pointer will get
1145                 * updated, otherwise we'll never see any data! */
1146                numbytes = dag_available(libtrace);
1147
1148                /* May as well not bother calling dag_get_record if
1149                 * dag_available suggests that there's no data */
1150                if (numbytes != 0)
1151                        erfptr = dag_get_record(libtrace);
1152                if (erfptr == NULL) {
1153                        /* No packet available - sleep for a very short time */
1154                        if (libtrace_halt) {
1155                                event.type = TRACE_EVENT_TERMINATE;
1156                        } else {                       
1157                                event.type = TRACE_EVENT_SLEEP;
1158                                event.seconds = 0.0001;
1159                        }
1160                        break;
1161                }
1162                if (dag_prepare_packet(libtrace, packet, erfptr, 
1163                                        TRACE_RT_DATA_ERF, flags)) {
1164                        event.type = TRACE_EVENT_TERMINATE;
1165                        break;
1166                }
1167
1168
1169                event.size = trace_get_capture_length(packet) + 
1170                                trace_get_framing_length(packet);
1171               
1172                /* XXX trace_read_packet() normally applies the following
1173                 * config options for us, but this function is called via
1174                 * trace_event() so we have to do it ourselves */
1175
1176                if (libtrace->filter) {
1177                        int filtret = trace_apply_filter(libtrace->filter, 
1178                                        packet);
1179                        if (filtret == -1) {
1180                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1181                                                "Bad BPF Filter");
1182                                event.type = TRACE_EVENT_TERMINATE;
1183                                break;
1184                        }
1185
1186                        if (filtret == 0) {
1187                                /* This packet isn't useful so we want to
1188                                 * immediately see if there is another suitable
1189                                 * one - we definitely DO NOT want to return
1190                                 * a sleep event in this case, like we used to
1191                                 * do! */
1192                                libtrace->filtered_packets ++;
1193                                trace_clear_cache(packet);
1194                                continue;
1195                        }
1196                               
1197                        event.type = TRACE_EVENT_PACKET;
1198                } else {
1199                        event.type = TRACE_EVENT_PACKET;
1200                }
1201
1202                if (libtrace->snaplen > 0) {
1203                        trace_set_capture_length(packet, libtrace->snaplen);
1204                }
1205                libtrace->accepted_packets ++;
1206                break;
1207        } while (1);
1208
1209        return event;
1210}
1211
1212/* Gets the number of dropped packets */
1213static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1214        if (trace->format_data == NULL)
1215                return (uint64_t)-1;
1216        return DATA(trace)->drops;
1217}
1218
1219/* Prints some semi-useful help text about the DAG format module */
1220static void dag_help(void) {
1221        printf("dag format module: $Revision: 1755 $\n");
1222        printf("Supported input URIs:\n");
1223        printf("\tdag:/dev/dagn\n");
1224        printf("\n");
1225        printf("\te.g.: dag:/dev/dag0\n");
1226        printf("\n");
1227        printf("Supported output URIs:\n");
1228        printf("\tnone\n");
1229        printf("\n");
1230}
1231
1232static struct libtrace_format_t dag = {
1233        "dag",
1234        "$Id$",
1235        TRACE_FORMAT_ERF,
1236        dag_probe_filename,             /* probe filename */
1237        NULL,                           /* probe magic */
1238        dag_init_input,                 /* init_input */
1239        dag_config_input,               /* config_input */
1240        dag_start_input,                /* start_input */
1241        dag_pause_input,                /* pause_input */
1242        dag_init_output,                /* init_output */
1243        NULL,                           /* config_output */
1244        dag_start_output,               /* start_output */
1245        dag_fin_input,                  /* fin_input */
1246        dag_fin_output,                 /* fin_output */
1247        dag_read_packet,                /* read_packet */
1248        dag_prepare_packet,             /* prepare_packet */
1249        NULL,                           /* fin_packet */
1250        dag_write_packet,               /* write_packet */
1251        erf_get_link_type,              /* get_link_type */
1252        erf_get_direction,              /* get_direction */
1253        erf_set_direction,              /* set_direction */
1254        erf_get_erf_timestamp,          /* get_erf_timestamp */
1255        NULL,                           /* get_timeval */
1256        NULL,                           /* get_seconds */
1257        NULL,                           /* get_timespec */
1258        NULL,                           /* seek_erf */
1259        NULL,                           /* seek_timeval */
1260        NULL,                           /* seek_seconds */
1261        erf_get_capture_length,         /* get_capture_length */
1262        erf_get_wire_length,            /* get_wire_length */
1263        erf_get_framing_length,         /* get_framing_length */
1264        erf_set_capture_length,         /* set_capture_length */
1265        NULL,                           /* get_received_packets */
1266        NULL,                           /* get_filtered_packets */
1267        dag_get_dropped_packets,        /* get_dropped_packets */
1268        NULL,                           /* get_captured_packets */
1269        NULL,                           /* get_fd */
1270        trace_event_dag,                /* trace_event */
1271        dag_help,                       /* help */
1272        NULL                            /* next pointer */
1273};
1274
1275void dag_constructor(void) {
1276        register_format(&dag);
1277}
Note: See TracBrowser for help on using the repository browser.