source: lib/format_dag25.c @ c5ac872

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since c5ac872 was c5ac872, checked in by Shane Alcock <salcock@…>, 6 years ago

Fixed compile error when using DAG 5.2 libraries

The name of the ioctl to fetch DUCK information had changed in DAG version 5
and the old one no longer worked.

Fixed bug where reading from a DAG card using trace_event would not
produce DUCK packets, even if the user requested them.

Added support for the new DUCK format present in DAG version 5.

  • Property mode set to 100644
File size: 39.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* String containing the DAG device name */
107        char *device_name;
108        /* The DAG device being used for writing */
109        struct dag_dev_t *device;
110        /* The DAG stream that is being written on */
111        unsigned int dagstream;
112        /* Boolean flag indicating whether the stream is currently attached */
113        int stream_attached;
114        /* The amount of data waiting to be transmitted, in bytes */
115        uint64_t waiting;
116        /* A buffer to hold the data to be transmittted */
117        uint8_t *txbuffer;
118};
119
120/* "Global" data that is stored for each DAG input trace */
121struct dag_format_data_t {
122
123        /* Data required for regular DUCK reporting */
124        struct {
125                /* Timestamp of the last DUCK report */
126                uint32_t last_duck;
127                /* The number of seconds between each DUCK report */
128                uint32_t duck_freq;
129                /* Timestamp of the last packet read from the DAG card */
130                uint32_t last_pkt;
131                /* Dummy trace to ensure DUCK packets are dealt with using the
132                 * DUCK format functions */
133                libtrace_t *dummy_duck;
134        } duck;
135
136        /* String containing the DAG device name */
137        char *device_name;
138        /* The DAG device that we are reading from */
139        struct dag_dev_t *device;
140        /* The DAG stream that we are reading from */
141        unsigned int dagstream;
142        /* Boolean flag indicating whether the stream is currently attached */
143        int stream_attached;
144        /* Pointer to the first unread byte in the DAG memory hole */
145        uint8_t *bottom;
146        /* Pointer to the last unread byte in the DAG memory hole */
147        uint8_t *top;
148        /* The amount of data processed thus far from the bottom pointer */
149        uint32_t processed;
150        /* The number of packets that have been dropped */
151        uint64_t drops;
152
153        uint8_t seeninterface[4];
154};
155
156/* To be thread-safe, we're going to need a mutex for operating on the list
157 * of DAG devices */
158pthread_mutex_t open_dag_mutex;
159
160/* The list of DAG devices that have been opened by libtrace.
161 *
162 * We can only open each DAG device once, but we might want to read from
163 * multiple streams. Therefore, we need to maintain a list of devices that we
164 * have opened (with ref counts!) so that we don't try to open a device too
165 * many times or close a device that we're still using */
166struct dag_dev_t *open_dags = NULL;
167
168/* Returns the amount of padding between the ERF header and the start of the
169 * captured packet data */
170static int dag_get_padding(const libtrace_packet_t *packet)
171{
172        /* ERF Ethernet records have a 2 byte padding before the packet itself
173         * so that the IP header is aligned on a 32 bit boundary.
174         */
175        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
176                dag_record_t *erfptr = (dag_record_t *)packet->header;
177                switch(erfptr->type) {
178                        case TYPE_ETH:
179                        case TYPE_DSM_COLOR_ETH:
180                                return 2;
181                        default:                return 0;
182                }
183        }
184        else {
185                switch(trace_get_link_type(packet)) {
186                        case TRACE_TYPE_ETH:    return 2;
187                        default:                return 0;
188                }
189        }
190}
191
192/* Attempts to determine if the given filename refers to a DAG device */
193static int dag_probe_filename(const char *filename)
194{
195        struct stat statbuf;
196        /* Can we stat the file? */
197        if (stat(filename, &statbuf) != 0) {
198                return 0;
199        }
200        /* Is it a character device? */
201        if (!S_ISCHR(statbuf.st_mode)) {
202                return 0;
203        }
204        /* Yeah, it's probably us. */
205        return 1;
206}
207
208/* Initialises the DAG output data structure */
209static void dag_init_format_out_data(libtrace_out_t *libtrace) {
210        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
211        // no DUCK on output
212        FORMAT_DATA_OUT->stream_attached = 0;
213        FORMAT_DATA_OUT->device = NULL;
214        FORMAT_DATA_OUT->device_name = NULL;
215        FORMAT_DATA_OUT->dagstream = 0;
216        FORMAT_DATA_OUT->waiting = 0;
217
218}
219
220/* Initialises the DAG input data structure */
221static void dag_init_format_data(libtrace_t *libtrace) {
222        libtrace->format_data = (struct dag_format_data_t *)
223                malloc(sizeof(struct dag_format_data_t));
224        DUCK.last_duck = 0;
225        DUCK.duck_freq = 0;
226        DUCK.last_pkt = 0;
227        DUCK.dummy_duck = NULL;
228        FORMAT_DATA->stream_attached = 0;
229        FORMAT_DATA->drops = 0;
230        FORMAT_DATA->device_name = NULL;
231        FORMAT_DATA->device = NULL;
232        FORMAT_DATA->dagstream = 0;
233        FORMAT_DATA->processed = 0;
234        FORMAT_DATA->bottom = NULL;
235        FORMAT_DATA->top = NULL;
236        memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
237}
238
239/* Determines if there is already an entry for the given DAG device in the
240 * device list and increments the reference count for that device, if found.
241 *
242 * NOTE: This function assumes the open_dag_mutex is held by the caller */
243static struct dag_dev_t *dag_find_open_device(char *dev_name) {
244        struct dag_dev_t *dag_dev;
245
246        dag_dev = open_dags;
247
248        /* XXX: Not exactly zippy, but how often are we going to be dealing
249         * with multiple dag cards? */
250        while (dag_dev != NULL) {
251                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
252                        dag_dev->ref_count ++;
253                        return dag_dev;
254
255                }
256                dag_dev = dag_dev->next;
257        }
258        return NULL;
259
260
261}
262
263/* Closes a DAG device and removes it from the device list.
264 *
265 * Attempting to close a DAG device that has a non-zero reference count will
266 * cause an assertion failure!
267 *
268 * NOTE: This function assumes the open_dag_mutex is held by the caller */
269static void dag_close_device(struct dag_dev_t *dev) {
270        /* Need to remove from the device list */
271
272        assert(dev->ref_count == 0);
273
274        if (dev->prev == NULL) {
275                open_dags = dev->next;
276                if (dev->next)
277                        dev->next->prev = NULL;
278        } else {
279                dev->prev->next = dev->next;
280                if (dev->next)
281                        dev->next->prev = dev->prev;
282        }
283
284        dag_close(dev->fd);
285        if (dev->dev_name)
286                free(dev->dev_name);
287        free(dev);
288}
289
290
291/* Opens a new DAG device for writing and adds it to the DAG device list
292 *
293 * NOTE: this function should only be called when opening a DAG device for
294 * writing - there is little practical difference between this and the
295 * function below that covers the reading case, but we need the output trace
296 * object to report errors properly so the two functions take slightly
297 * different arguments. This is really lame and there should be a much better
298 * way of doing this.
299 *
300 * NOTE: This function assumes the open_dag_mutex is held by the caller
301 */
302static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
303        struct stat buf;
304        int fd;
305        struct dag_dev_t *new_dev;
306
307        /* Make sure the device exists */
308        if (stat(dev_name, &buf) == -1) {
309                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
310                return NULL;
311}
312
313        /* Make sure it is the appropriate type of device */
314        if (S_ISCHR(buf.st_mode)) {
315                /* Try opening the DAG device */
316                if((fd = dag_open(dev_name)) < 0) {
317                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
318                                        dev_name);
319                        return NULL;
320                }
321        } else {
322                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
323                                dev_name);
324                return NULL;
325        }
326
327        /* Add the device to our device list - it is just a doubly linked
328         * list with no inherent ordering; just tack the new one on the front
329         */
330        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
331        new_dev->fd = fd;
332        new_dev->dev_name = dev_name;
333        new_dev->ref_count = 1;
334
335        new_dev->prev = NULL;
336        new_dev->next = open_dags;
337        if (open_dags)
338                open_dags->prev = new_dev;
339
340        open_dags = new_dev;
341
342        return new_dev;
343}
344
345/* Opens a new DAG device for reading and adds it to the DAG device list
346 *
347 * NOTE: this function should only be called when opening a DAG device for
348 * reading - there is little practical difference between this and the
349 * function above that covers the writing case, but we need the input trace
350 * object to report errors properly so the two functions take slightly
351 * different arguments. This is really lame and there should be a much better
352 * way of doing this.
353 *
354 * NOTE: This function assumes the open_dag_mutex is held by the caller */
355static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
356        struct stat buf;
357        int fd;
358        struct dag_dev_t *new_dev;
359
360        /* Make sure the device exists */
361        if (stat(dev_name, &buf) == -1) {
362                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
363                return NULL;
364        }
365
366        /* Make sure it is the appropriate type of device */
367        if (S_ISCHR(buf.st_mode)) {
368                /* Try opening the DAG device */
369                if((fd = dag_open(dev_name)) < 0) {
370                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
371                                        dev_name);
372                        return NULL;
373                }
374        } else {
375                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
376                                dev_name);
377                return NULL;
378        }
379
380        /* Add the device to our device list - it is just a doubly linked
381         * list with no inherent ordering; just tack the new one on the front
382         */
383        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
384        new_dev->fd = fd;
385        new_dev->dev_name = dev_name;
386        new_dev->ref_count = 1;
387
388        new_dev->prev = NULL;
389        new_dev->next = open_dags;
390        if (open_dags)
391                open_dags->prev = new_dev;
392
393        open_dags = new_dev;
394
395        return new_dev;
396}
397
398/* Creates and initialises a DAG output trace */
399static int dag_init_output(libtrace_out_t *libtrace) {
400        char *scan = NULL;
401        struct dag_dev_t *dag_device = NULL;
402        int stream = 1;
403       
404        /* XXX I don't know if this is important or not, but this function
405         * isn't present in all of the driver releases that this code is
406         * supposed to support! */
407        /*
408        unsigned long wake_time;
409        dagutil_sleep_get_wake_time(&wake_time,0);
410        */
411
412        dag_init_format_out_data(libtrace);
413        /* Grab the mutex while we're likely to be messing with the device
414         * list */
415        pthread_mutex_lock(&open_dag_mutex);
416       
417        /* Specific streams are signified using a comma in the libtrace URI,
418         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
419         *
420         * If no stream is specified, we will write using stream 1 */
421        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
422                FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);
423        } else {
424                FORMAT_DATA_OUT->device_name = 
425                                (char *)strndup(libtrace->uridata,
426                                (size_t)(scan - libtrace->uridata));
427                stream = atoi(++scan);
428        }
429        FORMAT_DATA_OUT->dagstream = stream;
430
431        /* See if our DAG device is already open */
432        dag_device = dag_find_open_device(FORMAT_DATA_OUT->device_name);
433
434        if (dag_device == NULL) {
435                /* Device not yet opened - open it ourselves */
436                dag_device = dag_open_output_device(libtrace, 
437                                FORMAT_DATA_OUT->device_name);
438        }
439
440        /* Make sure we have successfully opened a DAG device */
441        if (dag_device == NULL) {
442                if (FORMAT_DATA_OUT->device_name) {
443                        free(FORMAT_DATA_OUT->device_name);
444                        FORMAT_DATA_OUT->device_name = NULL;
445                }
446                pthread_mutex_unlock(&open_dag_mutex);
447                return -1;
448        }
449
450        FORMAT_DATA_OUT->device = dag_device;
451        pthread_mutex_unlock(&open_dag_mutex);
452        return 0;
453}
454
455/* Creates and initialises a DAG input trace */
456static int dag_init_input(libtrace_t *libtrace) {
457        char *scan = NULL;
458        int stream = 0;
459        struct dag_dev_t *dag_device = NULL;
460
461        dag_init_format_data(libtrace);
462        /* Grab the mutex while we're likely to be messing with the device
463         * list */
464        pthread_mutex_lock(&open_dag_mutex);
465       
466       
467        /* Specific streams are signified using a comma in the libtrace URI,
468         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
469         *
470         * If no stream is specified, we will read from stream 0 */
471        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
472                FORMAT_DATA->device_name = strdup(libtrace->uridata);
473        } else {
474                FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,
475                                (size_t)(scan - libtrace->uridata));
476                stream = atoi(++scan);
477        }
478
479        FORMAT_DATA->dagstream = stream;
480
481        /* See if our DAG device is already open */
482        dag_device = dag_find_open_device(FORMAT_DATA->device_name);
483
484        if (dag_device == NULL) {
485                /* Device not yet opened - open it ourselves */
486                dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name);
487        }
488
489        /* Make sure we have successfully opened a DAG device */
490        if (dag_device == NULL) {
491                if (FORMAT_DATA->device_name)
492                        free(FORMAT_DATA->device_name);
493                FORMAT_DATA->device_name = NULL;
494                pthread_mutex_unlock(&open_dag_mutex);
495                return -1;
496        }
497
498        FORMAT_DATA->device = dag_device;
499
500        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
501        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
502 
503        *  The symptom of the port being disabled is that libtrace will appear to hang.
504        */
505        /* Check kBooleanAttributeFault is false */
506        /* Check kBooleanAttributeLocalFault is false */
507        /* Check kBooleanAttributeLock is true ? */
508        /* Check kBooleanAttributePeerLink ? */
509
510        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
511        /* Set kUint32AttributeSnapLength to the snaplength */
512
513        pthread_mutex_unlock(&open_dag_mutex);
514        return 0;
515}
516
517/* Configures a DAG input trace */
518static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
519                                void *data) {
520        char conf_str[4096];
521        switch(option) {
522                case TRACE_OPTION_META_FREQ:
523                        /* This option is used to specify the frequency of DUCK
524                         * updates */
525                        DUCK.duck_freq = *(int *)data;
526                        return 0;
527                case TRACE_OPTION_SNAPLEN:
528                        /* Tell the card our new snap length */
529                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
530                        if (dag_configure(FORMAT_DATA->device->fd,
531                                                conf_str) != 0) {
532                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
533                                return -1;
534                        }
535                        return 0;
536                case TRACE_OPTION_PROMISC:
537                        /* DAG already operates in a promisc fashion */
538                        return -1;
539                case TRACE_OPTION_FILTER:
540                        /* We don't yet support pushing filters into DAG
541                         * cards */
542                        return -1;
543                case TRACE_OPTION_EVENT_REALTIME:
544                        /* Live capture is always going to be realtime */
545                        return -1;
546        }
547        return -1;
548}
549
550/* Starts a DAG output trace */
551static int dag_start_output(libtrace_out_t *libtrace) {
552        struct timeval zero, nopoll;
553
554        zero.tv_sec = 0;
555        zero.tv_usec = 0;
556        nopoll = zero;
557
558        /* Attach and start the DAG stream */
559
560        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
561                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
562                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
563                return -1;
564        }
565
566        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
567                        FORMAT_DATA_OUT->dagstream) < 0) {
568                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
569                return -1;
570        }
571        FORMAT_DATA_OUT->stream_attached = 1;
572
573        /* We don't want the dag card to do any sleeping */
574
575        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
576                        FORMAT_DATA_OUT->dagstream, 0, &zero,
577                        &nopoll);
578
579        return 0;
580}
581
582/* Starts a DAG input trace */
583static int dag_start_input(libtrace_t *libtrace) {
584        struct timeval zero, nopoll;
585        uint8_t *top, *bottom, *starttop;
586        top = bottom = NULL;
587
588        zero.tv_sec = 0;
589        zero.tv_usec = 10000;
590        nopoll = zero;
591
592        /* Attach and start the DAG stream */
593        if (dag_attach_stream(FORMAT_DATA->device->fd,
594                                FORMAT_DATA->dagstream, 0, 0) < 0) {
595                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
596                return -1;
597        }
598
599        if (dag_start_stream(FORMAT_DATA->device->fd,
600                                FORMAT_DATA->dagstream) < 0) {
601                trace_set_err(libtrace, errno, "Cannot start DAG stream");
602                return -1;
603        }
604        FORMAT_DATA->stream_attached = 1;
605       
606        /* We don't want the dag card to do any sleeping */
607        dag_set_stream_poll(FORMAT_DATA->device->fd,
608                                FORMAT_DATA->dagstream, 0, &zero,
609                                &nopoll);
610
611        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
612                                        FORMAT_DATA->dagstream,
613                                        &bottom);
614
615        /* Should probably flush the memory hole now */
616        top = starttop;
617        while (starttop - bottom > 0) {
618                bottom += (starttop - bottom);
619                top = dag_advance_stream(FORMAT_DATA->device->fd,
620                                        FORMAT_DATA->dagstream,
621                                        &bottom);
622        }
623        FORMAT_DATA->top = top;
624        FORMAT_DATA->bottom = bottom;
625        FORMAT_DATA->processed = 0;
626        FORMAT_DATA->drops = 0;
627
628        return 0;
629}
630
631/* Pauses a DAG output trace */
632static int dag_pause_output(libtrace_out_t *libtrace) {
633
634        /* Stop and detach the stream */
635        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
636                        FORMAT_DATA_OUT->dagstream) < 0) {
637                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
638                return -1;
639        }
640        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
641                        FORMAT_DATA_OUT->dagstream) < 0) {
642                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
643                return -1;
644        }
645        FORMAT_DATA_OUT->stream_attached = 0;
646        return 0;
647}
648
649/* Pauses a DAG input trace */
650static int dag_pause_input(libtrace_t *libtrace) {
651
652        /* Stop and detach the stream */
653        if (dag_stop_stream(FORMAT_DATA->device->fd,
654                                FORMAT_DATA->dagstream) < 0) {
655                trace_set_err(libtrace, errno, "Could not stop DAG stream");
656                return -1;
657        }
658        if (dag_detach_stream(FORMAT_DATA->device->fd,
659                                FORMAT_DATA->dagstream) < 0) {
660                trace_set_err(libtrace, errno, "Could not detach DAG stream");
661                return -1;
662        }
663        FORMAT_DATA->stream_attached = 0;
664        return 0;
665}
666
667/* Closes a DAG input trace */
668static int dag_fin_input(libtrace_t *libtrace) {
669        /* Need the lock, since we're going to be handling the device list */
670        pthread_mutex_lock(&open_dag_mutex);
671       
672        /* Detach the stream if we are not paused */
673        if (FORMAT_DATA->stream_attached)
674                dag_pause_input(libtrace);
675        FORMAT_DATA->device->ref_count --;
676
677        /* Close the DAG device if there are no more references to it */
678        if (FORMAT_DATA->device->ref_count == 0)
679                dag_close_device(FORMAT_DATA->device);
680        if (DUCK.dummy_duck)
681                trace_destroy_dead(DUCK.dummy_duck);
682        if (FORMAT_DATA->device_name)
683                free(FORMAT_DATA->device_name);
684        free(libtrace->format_data);
685        pthread_mutex_unlock(&open_dag_mutex);
686        return 0; /* success */
687}
688
689/* Closes a DAG output trace */
690static int dag_fin_output(libtrace_out_t *libtrace) {
691       
692        /* Commit any outstanding traffic in the txbuffer */
693        if (FORMAT_DATA_OUT->waiting) {
694                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
695                                FORMAT_DATA_OUT->waiting );
696        }
697
698        /* Wait until the buffer is nearly clear before exiting the program,
699         * as we will lose packets otherwise */
700        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
701                        FORMAT_DATA_OUT->dagstream,
702                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
703                                        FORMAT_DATA_OUT->dagstream) - 8
704                        );
705
706        /* Need the lock, since we're going to be handling the device list */
707        pthread_mutex_lock(&open_dag_mutex);
708
709        /* Detach the stream if we are not paused */
710        if (FORMAT_DATA_OUT->stream_attached)
711                dag_pause_output(libtrace);
712        FORMAT_DATA_OUT->device->ref_count --;
713
714        /* Close the DAG device if there are no more references to it */
715        if (FORMAT_DATA_OUT->device->ref_count == 0)
716                dag_close_device(FORMAT_DATA_OUT->device);
717        if (FORMAT_DATA_OUT->device_name)
718                free(FORMAT_DATA_OUT->device_name);
719        free(libtrace->format_data);
720        pthread_mutex_unlock(&open_dag_mutex);
721        return 0; /* success */
722}
723
724#ifdef DAGIOC_CARD_DUCK
725#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
726#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
727#else
728#ifdef DAGIOCDUCK
729#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
730#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
731#else
732#warning "DAG appears to be missing DUCK support"
733#endif
734#endif
735
736/* Extracts DUCK information from the DAG card and produces a DUCK packet */
737static int dag_get_duckinfo(libtrace_t *libtrace,
738                                libtrace_packet_t *packet) {
739
740        if (DUCK.duck_freq == 0)
741                return 0;
742
743#ifndef LIBTRACE_DUCK_IOCTL
744        trace_set_err(libtrace, errno, 
745                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
746        DUCK.duck_freq = 0;
747        return -1;
748#endif
749
750        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
751                return 0;
752
753        /* Allocate memory for the DUCK data */
754        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
755                        !packet->buffer) {
756                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
757                packet->buf_control = TRACE_CTRL_PACKET;
758                if (!packet->buffer) {
759                        trace_set_err(libtrace, errno,
760                                        "Cannot allocate packet buffer");
761                        return -1;
762                }
763        }
764
765        /* DUCK doesn't have a format header */
766        packet->header = 0;
767        packet->payload = packet->buffer;
768
769        /* No need to check if we can get DUCK or not - we're modern
770         * enough so just grab the DUCK info */
771        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
772                                        (duckinf_t *)packet->payload) < 0)) {
773                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
774                DUCK.duck_freq = 0;
775                return -1;
776        }
777
778        packet->type = LIBTRACE_DUCK_VERSION;
779
780        /* Set the packet's trace to point at a DUCK trace, so that the
781         * DUCK format functions will be called on the packet rather than the
782         * DAG ones */
783        if (!DUCK.dummy_duck)
784                DUCK.dummy_duck = trace_create_dead("duck:dummy");
785        packet->trace = DUCK.dummy_duck;
786        DUCK.last_duck = DUCK.last_pkt;
787        return sizeof(duckinf_t);
788}
789
790/* Determines the amount of data available to read from the DAG card */
791static int dag_available(libtrace_t *libtrace) {
792        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
793
794        /* If we've processed more than 4MB of data since we last called
795         * dag_advance_stream, then we should call it again to allow the
796         * space occupied by that 4MB to be released */
797        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
798                return diff;
799       
800        /* Update the top and bottom pointers */
801        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
802                        FORMAT_DATA->dagstream,
803                        &(FORMAT_DATA->bottom));
804       
805        if (FORMAT_DATA->top == NULL) {
806                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
807                return -1;
808        }
809        FORMAT_DATA->processed = 0;
810        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
811        return diff;
812}
813
814/* Returns a pointer to the start of the next complete ERF record */
815static dag_record_t *dag_get_record(libtrace_t *libtrace) {
816        dag_record_t *erfptr = NULL;
817        uint16_t size;
818        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
819        if (!erfptr)
820                return NULL;
821        size = ntohs(erfptr->rlen);
822        assert( size >= dag_record_size );
823        /* Make certain we have the full packet available */
824        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
825                return NULL;
826        FORMAT_DATA->bottom += size;
827        FORMAT_DATA->processed += size;
828        return erfptr;
829}
830
831/* Converts a buffer containing a recently read DAG packet record into a
832 * libtrace packet */
833static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
834                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
835
836        dag_record_t *erfptr;
837       
838        /* If the packet previously owned a buffer that is not the buffer
839         * that contains the new packet data, we're going to need to free the
840         * old one to avoid memory leaks */
841        if (packet->buffer != buffer &&
842                        packet->buf_control == TRACE_CTRL_PACKET) {
843                free(packet->buffer);
844        }
845
846        /* Set the buffer owner appropriately */
847        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
848                packet->buf_control = TRACE_CTRL_PACKET;
849        } else
850                packet->buf_control = TRACE_CTRL_EXTERNAL;
851
852        /* Update the packet pointers and type appropriately */
853        erfptr = (dag_record_t *)buffer;
854        packet->buffer = erfptr;
855        packet->header = erfptr;
856        packet->type = rt_type;
857
858        if (erfptr->flags.rxerror == 1) {
859                /* rxerror means the payload is corrupt - drop the payload
860                 * by tweaking rlen */
861                packet->payload = NULL;
862                erfptr->rlen = htons(erf_get_framing_length(packet));
863        } else {
864                packet->payload = (char*)packet->buffer
865                        + erf_get_framing_length(packet);
866        }
867
868        if (libtrace->format_data == NULL) {
869                dag_init_format_data(libtrace);
870        }
871
872        /* Update the dropped packets counter */
873
874        /* No loss counter for DSM coloured records - have to use
875         * some other API */
876        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
877                /* TODO */
878        } else {
879                /* Use the ERF loss counter */
880                if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
881                        FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
882                } else {
883                        FORMAT_DATA->drops += ntohs(erfptr->lctr);
884                }
885        }
886
887        return 0;
888}
889
890/*
891 * dag_write_packet() at this stage attempts to improve tx performance
892 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
893 * has been met. I observed approximately 270% performance increase
894 * through this relatively naive tweak. No optimisation of buffer sizes
895 * was attempted.
896 */
897
898/* Pushes an ERF record onto the transmit stream */
899static int dag_dump_packet(libtrace_out_t *libtrace,
900                dag_record_t *erfptr, unsigned int pad, void *buffer) {
901        int size;
902
903        /*
904         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
905         * requested any space yet, and request some, storing the pointer at
906         * FORMAT_DATA_OUT->txbuffer.
907         *
908         * The amount to request is slightly magical at the moment - it's
909         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
910         * the buffer and handle overruns.
911         */
912        if (FORMAT_DATA_OUT->waiting == 0) {
913                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
914                                FORMAT_DATA_OUT->dagstream, 16908288);
915        }
916
917        /*
918         * Copy the header separately to the body, as we can't guarantee they
919         * are in contiguous memory
920         */
921        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
922        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
923
924
925
926        /*
927         * Copy our incoming packet into the outgoing buffer, and increment
928         * our waiting count
929         */
930        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
931        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
932        FORMAT_DATA_OUT->waiting += size;
933
934        /*
935         * If our output buffer has more than 16 Mebibytes in it, commit those
936         * bytes and reset the waiting count to 0.
937         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
938         * case there is still data in the buffer at program exit.
939         */
940
941        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
942                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
943                        FORMAT_DATA_OUT->waiting );
944                FORMAT_DATA_OUT->waiting = 0;
945        }
946
947        return size + pad + dag_record_size;
948
949}
950
951/* Attempts to determine a suitable ERF type for a given packet. Returns true
952 * if one is found, false otherwise */
953static bool find_compatible_linktype(libtrace_out_t *libtrace,
954                                libtrace_packet_t *packet, char *type)
955{
956         // Keep trying to simplify the packet until we can find
957         //something we can do with it
958
959        do {
960                *type=libtrace_to_erf_type(trace_get_link_type(packet));
961
962                // Success
963                if (*type != (char)-1)
964                        return true;
965
966                if (!demote_packet(packet)) {
967                        trace_set_err_out(libtrace,
968                                        TRACE_ERR_NO_CONVERSION,
969                                        "No erf type for packet (%i)",
970                                        trace_get_link_type(packet));
971                        return false;
972                }
973
974        } while(1);
975
976        return true;
977}
978
979/* Writes a packet to the provided DAG output trace */
980static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
981        /*
982         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
983         * sucks, sorry about that.
984         */
985        unsigned int pad = 0;
986        int numbytes;
987        void *payload = packet->payload;
988        dag_record_t *header = (dag_record_t *)packet->header;
989        char erf_type = 0;
990
991        if(!packet->header) {
992                /* No header, probably an RT packet. Lifted from
993                 * erf_write_packet(). */
994                return -1;
995        }
996
997        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
998                return 0;
999
1000        pad = dag_get_padding(packet);
1001
1002        /*
1003         * If the payload is null, adjust the rlen. Discussion of this is
1004         * attached to erf_write_packet()
1005         */
1006        if (payload == NULL) {
1007                header->rlen = htons(dag_record_size + pad);
1008        }
1009
1010        if (packet->type == TRACE_RT_DATA_ERF) {
1011                numbytes = dag_dump_packet(libtrace,
1012                                header,
1013                                pad,
1014                                payload
1015                                );
1016
1017        } else {
1018                /* Build up a new packet header from the existing header */
1019
1020                /* Simplify the packet first - if we can't do this, break
1021                 * early */
1022                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1023                        return -1;
1024
1025                dag_record_t erfhdr;
1026
1027                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1028                payload=packet->payload;
1029                pad = dag_get_padding(packet);
1030
1031                /* Flags. Can't do this */
1032                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1033                if (trace_get_direction(packet)!=(int)~0U)
1034                        erfhdr.flags.iface = trace_get_direction(packet);
1035
1036                erfhdr.type = erf_type;
1037
1038                /* Packet length (rlen includes format overhead) */
1039                assert(trace_get_capture_length(packet)>0
1040                                && trace_get_capture_length(packet)<=65536);
1041                assert(erf_get_framing_length(packet)>0
1042                                && trace_get_framing_length(packet)<=65536);
1043                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1044                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1045
1046                erfhdr.rlen = htons(trace_get_capture_length(packet)
1047                        + erf_get_framing_length(packet));
1048
1049
1050                /* Loss counter. Can't do this */
1051                erfhdr.lctr = 0;
1052                /* Wire length, does not include padding! */
1053                erfhdr.wlen = htons(trace_get_wire_length(packet));
1054
1055                /* Write it out */
1056                numbytes = dag_dump_packet(libtrace,
1057                                &erfhdr,
1058                                pad,
1059                                payload);
1060
1061        }
1062
1063        return numbytes;
1064}
1065
1066/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1067 *
1068 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1069 */
1070static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1071        int size = 0;
1072        struct timeval tv;
1073        dag_record_t *erfptr = NULL;
1074        int numbytes = 0;
1075        uint32_t flags = 0;
1076        struct timeval maxwait;
1077        struct timeval pollwait;
1078
1079        pollwait.tv_sec = 0;
1080        pollwait.tv_usec = 10000;
1081        maxwait.tv_sec = 0;
1082        maxwait.tv_usec = 250000;
1083
1084        /* Check if we're due for a DUCK report */
1085        size = dag_get_duckinfo(libtrace, packet);
1086
1087        if (size != 0)
1088                return size;
1089
1090
1091        /* Don't let anyone try to free our DAG memory hole! */
1092        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1093
1094        /* If the packet buffer is currently owned by libtrace, free it so
1095         * that we can set the packet to point into the DAG memory hole */
1096        if (packet->buf_control == TRACE_CTRL_PACKET) {
1097                free(packet->buffer);
1098                packet->buffer = 0;
1099        }
1100       
1101        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1102                        FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 
1103                        &pollwait) == -1)
1104        {
1105                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1106                return -1;
1107        }
1108
1109
1110        /* Grab a full ERF record */
1111        do {
1112                numbytes = dag_available(libtrace);
1113                if (numbytes < 0)
1114                        return numbytes;
1115                if (numbytes < dag_record_size) {
1116                        if (libtrace_halt)
1117                                return 0;
1118                        /* Block until we see a packet */
1119                        continue;
1120                }
1121                erfptr = dag_get_record(libtrace);
1122        } while (erfptr == NULL);
1123
1124        /* Prepare the libtrace packet */
1125        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1126                                flags))
1127                return -1;
1128
1129        /* Update the DUCK timer */
1130        tv = trace_get_timeval(packet);
1131        DUCK.last_pkt = tv.tv_sec;
1132
1133        return packet->payload ? htons(erfptr->rlen) :
1134                                erf_get_framing_length(packet);
1135}
1136
1137/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1138 * packet is available, we will return a packet event. Otherwise we will
1139 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1140 */
1141static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1142                                        libtrace_packet_t *packet) {
1143        libtrace_eventobj_t event = {0,0,0.0,0};
1144        dag_record_t *erfptr = NULL;
1145        int numbytes;
1146        uint32_t flags = 0;
1147        struct timeval minwait, tv;
1148       
1149        minwait.tv_sec = 0;
1150        minwait.tv_usec = 10000;
1151
1152        /* Check if we're meant to provide a DUCK update */
1153        numbytes = dag_get_duckinfo(libtrace, packet);
1154        if (numbytes < 0) {
1155                event.type = TRACE_EVENT_TERMINATE;
1156                return event;
1157        } else if (numbytes > 0) {
1158                event.type = TRACE_EVENT_PACKET;
1159                return event;
1160        }
1161       
1162        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1163                        FORMAT_DATA->dagstream, 0, &minwait, 
1164                        &minwait) == -1)
1165        {
1166                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1167                event.type = TRACE_EVENT_TERMINATE;
1168                return event;
1169        }
1170
1171        do {
1172                erfptr = NULL;
1173                numbytes = 0;
1174       
1175                /* Need to call dag_available so that the top pointer will get
1176                 * updated, otherwise we'll never see any data! */
1177                numbytes = dag_available(libtrace);
1178
1179                /* May as well not bother calling dag_get_record if
1180                 * dag_available suggests that there's no data */
1181                if (numbytes != 0)
1182                        erfptr = dag_get_record(libtrace);
1183                if (erfptr == NULL) {
1184                        /* No packet available - sleep for a very short time */
1185                        if (libtrace_halt) {
1186                                event.type = TRACE_EVENT_TERMINATE;
1187                        } else {                       
1188                                event.type = TRACE_EVENT_SLEEP;
1189                                event.seconds = 0.0001;
1190                        }
1191                        break;
1192                }
1193                if (dag_prepare_packet(libtrace, packet, erfptr, 
1194                                        TRACE_RT_DATA_ERF, flags)) {
1195                        event.type = TRACE_EVENT_TERMINATE;
1196                        break;
1197                }
1198
1199
1200                event.size = trace_get_capture_length(packet) + 
1201                                trace_get_framing_length(packet);
1202               
1203                /* XXX trace_read_packet() normally applies the following
1204                 * config options for us, but this function is called via
1205                 * trace_event() so we have to do it ourselves */
1206
1207                if (libtrace->filter) {
1208                        int filtret = trace_apply_filter(libtrace->filter, 
1209                                        packet);
1210                        if (filtret == -1) {
1211                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1212                                                "Bad BPF Filter");
1213                                event.type = TRACE_EVENT_TERMINATE;
1214                                break;
1215                        }
1216
1217                        if (filtret == 0) {
1218                                /* This packet isn't useful so we want to
1219                                 * immediately see if there is another suitable
1220                                 * one - we definitely DO NOT want to return
1221                                 * a sleep event in this case, like we used to
1222                                 * do! */
1223                                libtrace->filtered_packets ++;
1224                                trace_clear_cache(packet);
1225                                continue;
1226                        }
1227                               
1228                        event.type = TRACE_EVENT_PACKET;
1229                } else {
1230                        event.type = TRACE_EVENT_PACKET;
1231                }
1232
1233                /* Update the DUCK timer */
1234                tv = trace_get_timeval(packet);
1235                DUCK.last_pkt = tv.tv_sec;
1236               
1237                if (libtrace->snaplen > 0) {
1238                        trace_set_capture_length(packet, libtrace->snaplen);
1239                }
1240                libtrace->accepted_packets ++;
1241                break;
1242        } while (1);
1243
1244        return event;
1245}
1246
1247/* Gets the number of dropped packets */
1248static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1249        if (trace->format_data == NULL)
1250                return (uint64_t)-1;
1251        return DATA(trace)->drops;
1252}
1253
1254/* Prints some semi-useful help text about the DAG format module */
1255static void dag_help(void) {
1256        printf("dag format module: $Revision: 1755 $\n");
1257        printf("Supported input URIs:\n");
1258        printf("\tdag:/dev/dagn\n");
1259        printf("\n");
1260        printf("\te.g.: dag:/dev/dag0\n");
1261        printf("\n");
1262        printf("Supported output URIs:\n");
1263        printf("\tnone\n");
1264        printf("\n");
1265}
1266
1267static struct libtrace_format_t dag = {
1268        "dag",
1269        "$Id$",
1270        TRACE_FORMAT_ERF,
1271        dag_probe_filename,             /* probe filename */
1272        NULL,                           /* probe magic */
1273        dag_init_input,                 /* init_input */
1274        dag_config_input,               /* config_input */
1275        dag_start_input,                /* start_input */
1276        dag_pause_input,                /* pause_input */
1277        dag_init_output,                /* init_output */
1278        NULL,                           /* config_output */
1279        dag_start_output,               /* start_output */
1280        dag_fin_input,                  /* fin_input */
1281        dag_fin_output,                 /* fin_output */
1282        dag_read_packet,                /* read_packet */
1283        dag_prepare_packet,             /* prepare_packet */
1284        NULL,                           /* fin_packet */
1285        dag_write_packet,               /* write_packet */
1286        erf_get_link_type,              /* get_link_type */
1287        erf_get_direction,              /* get_direction */
1288        erf_set_direction,              /* set_direction */
1289        erf_get_erf_timestamp,          /* get_erf_timestamp */
1290        NULL,                           /* get_timeval */
1291        NULL,                           /* get_seconds */
1292        NULL,                           /* get_timespec */
1293        NULL,                           /* seek_erf */
1294        NULL,                           /* seek_timeval */
1295        NULL,                           /* seek_seconds */
1296        erf_get_capture_length,         /* get_capture_length */
1297        erf_get_wire_length,            /* get_wire_length */
1298        erf_get_framing_length,         /* get_framing_length */
1299        erf_set_capture_length,         /* set_capture_length */
1300        NULL,                           /* get_received_packets */
1301        NULL,                           /* get_filtered_packets */
1302        dag_get_dropped_packets,        /* get_dropped_packets */
1303        NULL,                           /* get_captured_packets */
1304        NULL,                           /* get_fd */
1305        trace_event_dag,                /* trace_event */
1306        dag_help,                       /* help */
1307        NULL                            /* next pointer */
1308};
1309
1310void dag_constructor(void) {
1311        register_format(&dag);
1312}
Note: See TracBrowser for help on using the repository browser.