source: lib/format_dag25.c @ 68d3308

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 68d3308 was 68d3308, checked in by Shane Alcock <salcock@…>, 6 years ago

Fix bug in DAG memory hole flush

As reported by Dan Collins, the recent fix to ensure the DAG memory
hole is properly flushed did not account for the case where the hole
was already empty. As a result, 'top' was not initialised properly and
DAG captures wouldn't work.

Should solve Issue #12.

  • Property mode set to 100644
File size: 38.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* The DAG device being used for writing */
107        struct dag_dev_t *device;
108        /* The DAG stream that is being written on */
109        unsigned int dagstream;
110        /* Boolean flag indicating whether the stream is currently attached */
111        int stream_attached;
112        /* The amount of data waiting to be transmitted, in bytes */
113        uint64_t waiting;
114        /* A buffer to hold the data to be transmittted */
115        uint8_t *txbuffer;
116};
117
118/* "Global" data that is stored for each DAG input trace */
119struct dag_format_data_t {
120
121        /* Data required for regular DUCK reporting */
122        struct {
123                /* Timestamp of the last DUCK report */
124                uint32_t last_duck;
125                /* The number of seconds between each DUCK report */
126                uint32_t duck_freq;
127                /* Timestamp of the last packet read from the DAG card */
128                uint32_t last_pkt;
129                /* Dummy trace to ensure DUCK packets are dealt with using the
130                 * DUCK format functions */
131                libtrace_t *dummy_duck;
132        } duck;
133
134        /* The DAG device that we are reading from */
135        struct dag_dev_t *device;
136        /* The DAG stream that we are reading from */
137        unsigned int dagstream;
138        /* Boolean flag indicating whether the stream is currently attached */
139        int stream_attached;
140        /* Pointer to the first unread byte in the DAG memory hole */
141        uint8_t *bottom;
142        /* Pointer to the last unread byte in the DAG memory hole */
143        uint8_t *top;
144        /* The amount of data processed thus far from the bottom pointer */
145        uint32_t processed;
146        /* The number of packets that have been dropped */
147        uint64_t drops;
148
149        uint8_t seeninterface[4];
150};
151
152/* To be thread-safe, we're going to need a mutex for operating on the list
153 * of DAG devices */
154pthread_mutex_t open_dag_mutex;
155
156/* The list of DAG devices that have been opened by libtrace.
157 *
158 * We can only open each DAG device once, but we might want to read from
159 * multiple streams. Therefore, we need to maintain a list of devices that we
160 * have opened (with ref counts!) so that we don't try to open a device too
161 * many times or close a device that we're still using */
162struct dag_dev_t *open_dags = NULL;
163
164/* Returns the amount of padding between the ERF header and the start of the
165 * captured packet data */
166static int dag_get_padding(const libtrace_packet_t *packet)
167{
168        /* ERF Ethernet records have a 2 byte padding before the packet itself
169         * so that the IP header is aligned on a 32 bit boundary.
170         */
171        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
172                dag_record_t *erfptr = (dag_record_t *)packet->header;
173                switch(erfptr->type) {
174                        case TYPE_ETH:
175                        case TYPE_DSM_COLOR_ETH:
176                                return 2;
177                        default:                return 0;
178                }
179        }
180        else {
181                switch(trace_get_link_type(packet)) {
182                        case TRACE_TYPE_ETH:    return 2;
183                        default:                return 0;
184                }
185        }
186}
187
188/* Attempts to determine if the given filename refers to a DAG device */
189static int dag_probe_filename(const char *filename)
190{
191        struct stat statbuf;
192        /* Can we stat the file? */
193        if (stat(filename, &statbuf) != 0) {
194                return 0;
195        }
196        /* Is it a character device? */
197        if (!S_ISCHR(statbuf.st_mode)) {
198                return 0;
199        }
200        /* Yeah, it's probably us. */
201        return 1;
202}
203
204/* Initialises the DAG output data structure */
205static void dag_init_format_out_data(libtrace_out_t *libtrace) {
206        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
207        // no DUCK on output
208        FORMAT_DATA_OUT->stream_attached = 0;
209        FORMAT_DATA_OUT->device = NULL;
210        FORMAT_DATA_OUT->dagstream = 0;
211        FORMAT_DATA_OUT->waiting = 0;
212
213}
214
215/* Initialises the DAG input data structure */
216static void dag_init_format_data(libtrace_t *libtrace) {
217        libtrace->format_data = (struct dag_format_data_t *)
218                malloc(sizeof(struct dag_format_data_t));
219        DUCK.last_duck = 0;
220        DUCK.duck_freq = 0;
221        DUCK.last_pkt = 0;
222        DUCK.dummy_duck = NULL;
223        FORMAT_DATA->stream_attached = 0;
224        FORMAT_DATA->drops = 0;
225        FORMAT_DATA->device = NULL;
226        FORMAT_DATA->dagstream = 0;
227        FORMAT_DATA->processed = 0;
228        FORMAT_DATA->bottom = NULL;
229        FORMAT_DATA->top = NULL;
230        memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
231}
232
233/* Determines if there is already an entry for the given DAG device in the
234 * device list and increments the reference count for that device, if found.
235 *
236 * NOTE: This function assumes the open_dag_mutex is held by the caller */
237static struct dag_dev_t *dag_find_open_device(char *dev_name) {
238        struct dag_dev_t *dag_dev;
239
240        dag_dev = open_dags;
241
242        /* XXX: Not exactly zippy, but how often are we going to be dealing
243         * with multiple dag cards? */
244        while (dag_dev != NULL) {
245                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
246                        dag_dev->ref_count ++;
247                        return dag_dev;
248
249                }
250                dag_dev = dag_dev->next;
251        }
252        return NULL;
253
254
255}
256
257/* Closes a DAG device and removes it from the device list.
258 *
259 * Attempting to close a DAG device that has a non-zero reference count will
260 * cause an assertion failure!
261 *
262 * NOTE: This function assumes the open_dag_mutex is held by the caller */
263static void dag_close_device(struct dag_dev_t *dev) {
264        /* Need to remove from the device list */
265
266        assert(dev->ref_count == 0);
267
268        if (dev->prev == NULL) {
269                open_dags = dev->next;
270                if (dev->next)
271                        dev->next->prev = NULL;
272        } else {
273                dev->prev->next = dev->next;
274                if (dev->next)
275                        dev->next->prev = dev->prev;
276        }
277
278        dag_close(dev->fd);
279        if (dev->dev_name)
280                free(dev->dev_name);
281        free(dev);
282}
283
284
285/* Opens a new DAG device for writing and adds it to the DAG device list
286 *
287 * NOTE: this function should only be called when opening a DAG device for
288 * writing - there is little practical difference between this and the
289 * function below that covers the reading case, but we need the output trace
290 * object to report errors properly so the two functions take slightly
291 * different arguments. This is really lame and there should be a much better
292 * way of doing this.
293 *
294 * NOTE: This function assumes the open_dag_mutex is held by the caller
295 */
296static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
297        struct stat buf;
298        int fd;
299        struct dag_dev_t *new_dev;
300
301        /* Make sure the device exists */
302        if (stat(dev_name, &buf) == -1) {
303                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
304                return NULL;
305}
306
307        /* Make sure it is the appropriate type of device */
308        if (S_ISCHR(buf.st_mode)) {
309                /* Try opening the DAG device */
310                if((fd = dag_open(dev_name)) < 0) {
311                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
312                                        dev_name);
313                        return NULL;
314                }
315        } else {
316                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
317                                dev_name);
318                return NULL;
319        }
320
321        /* Add the device to our device list - it is just a doubly linked
322         * list with no inherent ordering; just tack the new one on the front
323         */
324        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
325        new_dev->fd = fd;
326        new_dev->dev_name = dev_name;
327        new_dev->ref_count = 1;
328
329        new_dev->prev = NULL;
330        new_dev->next = open_dags;
331        if (open_dags)
332                open_dags->prev = new_dev;
333
334        open_dags = new_dev;
335
336        return new_dev;
337}
338
339/* Opens a new DAG device for reading and adds it to the DAG device list
340 *
341 * NOTE: this function should only be called when opening a DAG device for
342 * reading - there is little practical difference between this and the
343 * function above that covers the writing case, but we need the input trace
344 * object to report errors properly so the two functions take slightly
345 * different arguments. This is really lame and there should be a much better
346 * way of doing this.
347 *
348 * NOTE: This function assumes the open_dag_mutex is held by the caller */
349static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
350        struct stat buf;
351        int fd;
352        struct dag_dev_t *new_dev;
353
354        /* Make sure the device exists */
355        if (stat(dev_name, &buf) == -1) {
356                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
357                return NULL;
358        }
359
360        /* Make sure it is the appropriate type of device */
361        if (S_ISCHR(buf.st_mode)) {
362                /* Try opening the DAG device */
363                if((fd = dag_open(dev_name)) < 0) {
364                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
365                                        dev_name);
366                        return NULL;
367                }
368        } else {
369                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
370                                dev_name);
371                return NULL;
372        }
373
374        /* Add the device to our device list - it is just a doubly linked
375         * list with no inherent ordering; just tack the new one on the front
376         */
377        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
378        new_dev->fd = fd;
379        new_dev->dev_name = dev_name;
380        new_dev->ref_count = 1;
381
382        new_dev->prev = NULL;
383        new_dev->next = open_dags;
384        if (open_dags)
385                open_dags->prev = new_dev;
386
387        open_dags = new_dev;
388
389        return new_dev;
390}
391
392/* Creates and initialises a DAG output trace */
393static int dag_init_output(libtrace_out_t *libtrace) {
394        char *dag_dev_name = NULL;
395        char *scan = NULL;
396        struct dag_dev_t *dag_device = NULL;
397        int stream = 1;
398       
399        /* XXX I don't know if this is important or not, but this function
400         * isn't present in all of the driver releases that this code is
401         * supposed to support! */
402        /*
403        unsigned long wake_time;
404        dagutil_sleep_get_wake_time(&wake_time,0);
405        */
406
407        dag_init_format_out_data(libtrace);
408        /* Grab the mutex while we're likely to be messing with the device
409         * list */
410        pthread_mutex_lock(&open_dag_mutex);
411       
412        /* Specific streams are signified using a comma in the libtrace URI,
413         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
414         *
415         * If no stream is specified, we will write using stream 1 */
416        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
417                dag_dev_name = strdup(libtrace->uridata);
418        } else {
419                dag_dev_name = (char *)strndup(libtrace->uridata,
420                                (size_t)(scan - libtrace->uridata));
421                stream = atoi(++scan);
422        }
423        FORMAT_DATA_OUT->dagstream = stream;
424
425        /* See if our DAG device is already open */
426        dag_device = dag_find_open_device(dag_dev_name);
427
428        if (dag_device == NULL) {
429                /* Device not yet opened - open it ourselves */
430                dag_device = dag_open_output_device(libtrace, dag_dev_name);
431        } else {
432                /* Otherwise, just use the existing one */
433                free(dag_dev_name);
434                dag_dev_name = NULL;
435        }
436
437        /* Make sure we have successfully opened a DAG device */
438        if (dag_device == NULL) {
439                if (dag_dev_name) {
440                        free(dag_dev_name);
441                }
442                pthread_mutex_unlock(&open_dag_mutex);
443                return -1;
444        }
445
446        FORMAT_DATA_OUT->device = dag_device;
447        pthread_mutex_unlock(&open_dag_mutex);
448        return 0;
449}
450
451/* Creates and initialises a DAG input trace */
452static int dag_init_input(libtrace_t *libtrace) {
453        char *dag_dev_name = NULL;
454        char *scan = NULL;
455        int stream = 0;
456        struct dag_dev_t *dag_device = NULL;
457
458        dag_init_format_data(libtrace);
459        /* Grab the mutex while we're likely to be messing with the device
460         * list */
461        pthread_mutex_lock(&open_dag_mutex);
462       
463       
464        /* Specific streams are signified using a comma in the libtrace URI,
465         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
466         *
467         * If no stream is specified, we will read from stream 0 */
468        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
469                dag_dev_name = strdup(libtrace->uridata);
470        } else {
471                dag_dev_name = (char *)strndup(libtrace->uridata,
472                                (size_t)(scan - libtrace->uridata));
473                stream = atoi(++scan);
474        }
475
476        FORMAT_DATA->dagstream = stream;
477
478        /* See if our DAG device is already open */
479        dag_device = dag_find_open_device(dag_dev_name);
480
481        if (dag_device == NULL) {
482                /* Device not yet opened - open it ourselves */
483                dag_device = dag_open_device(libtrace, dag_dev_name);
484        } else {
485                /* Otherwise, just use the existing one */
486                free(dag_dev_name);
487                dag_dev_name = NULL;
488        }
489
490        /* Make sure we have successfully opened a DAG device */
491        if (dag_device == NULL) {
492                if (dag_dev_name)
493                        free(dag_dev_name);
494                dag_dev_name = NULL;
495                pthread_mutex_unlock(&open_dag_mutex);
496                return -1;
497        }
498
499        FORMAT_DATA->device = dag_device;
500
501        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
502        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
503 
504        *  The symptom of the port being disabled is that libtrace will appear to hang.
505        */
506        /* Check kBooleanAttributeFault is false */
507        /* Check kBooleanAttributeLocalFault is false */
508        /* Check kBooleanAttributeLock is true ? */
509        /* Check kBooleanAttributePeerLink ? */
510
511        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
512        /* Set kUint32AttributeSnapLength to the snaplength */
513
514        pthread_mutex_unlock(&open_dag_mutex);
515        return 0;
516}
517
518/* Configures a DAG input trace */
519static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
520                                void *data) {
521        char conf_str[4096];
522        switch(option) {
523                case TRACE_OPTION_META_FREQ:
524                        /* This option is used to specify the frequency of DUCK
525                         * updates */
526                        DUCK.duck_freq = *(int *)data;
527                        return 0;
528                case TRACE_OPTION_SNAPLEN:
529                        /* Tell the card our new snap length */
530                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
531                        if (dag_configure(FORMAT_DATA->device->fd,
532                                                conf_str) != 0) {
533                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
534                                return -1;
535                        }
536                        return 0;
537                case TRACE_OPTION_PROMISC:
538                        /* DAG already operates in a promisc fashion */
539                        return -1;
540                case TRACE_OPTION_FILTER:
541                        /* We don't yet support pushing filters into DAG
542                         * cards */
543                        return -1;
544                case TRACE_OPTION_EVENT_REALTIME:
545                        /* Live capture is always going to be realtime */
546                        return -1;
547        }
548        return -1;
549}
550
551/* Starts a DAG output trace */
552static int dag_start_output(libtrace_out_t *libtrace) {
553        struct timeval zero, nopoll;
554
555        zero.tv_sec = 0;
556        zero.tv_usec = 0;
557        nopoll = zero;
558
559        /* Attach and start the DAG stream */
560
561        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
562                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
563                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
564                return -1;
565        }
566
567        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
568                        FORMAT_DATA_OUT->dagstream) < 0) {
569                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
570                return -1;
571        }
572        FORMAT_DATA_OUT->stream_attached = 1;
573
574        /* We don't want the dag card to do any sleeping */
575
576        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
577                        FORMAT_DATA_OUT->dagstream, 0, &zero,
578                        &nopoll);
579
580        return 0;
581}
582
583/* Starts a DAG input trace */
584static int dag_start_input(libtrace_t *libtrace) {
585        struct timeval zero, nopoll;
586        uint8_t *top, *bottom, *starttop;
587        uint64_t diff = 0;
588        top = bottom = NULL;
589
590        zero.tv_sec = 0;
591        zero.tv_usec = 10000;
592        nopoll = zero;
593
594        /* Attach and start the DAG stream */
595        if (dag_attach_stream(FORMAT_DATA->device->fd,
596                                FORMAT_DATA->dagstream, 0, 0) < 0) {
597                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
598                return -1;
599        }
600
601        if (dag_start_stream(FORMAT_DATA->device->fd,
602                                FORMAT_DATA->dagstream) < 0) {
603                trace_set_err(libtrace, errno, "Cannot start DAG stream");
604                return -1;
605        }
606        FORMAT_DATA->stream_attached = 1;
607       
608        /* We don't want the dag card to do any sleeping */
609        dag_set_stream_poll(FORMAT_DATA->device->fd,
610                                FORMAT_DATA->dagstream, 0, &zero,
611                                &nopoll);
612
613        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
614                                        FORMAT_DATA->dagstream,
615                                        &bottom);
616
617        /* Should probably flush the memory hole now */
618        top = starttop;
619        while (starttop - bottom > 0) {
620                bottom += (starttop - bottom);
621                top = dag_advance_stream(FORMAT_DATA->device->fd,
622                                        FORMAT_DATA->dagstream,
623                                        &bottom);
624        }
625        FORMAT_DATA->top = top;
626        FORMAT_DATA->bottom = bottom;
627        FORMAT_DATA->processed = 0;
628        FORMAT_DATA->drops = 0;
629
630        return 0;
631}
632
633/* Pauses a DAG output trace */
634static int dag_pause_output(libtrace_out_t *libtrace) {
635
636        /* Stop and detach the stream */
637        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
638                        FORMAT_DATA_OUT->dagstream) < 0) {
639                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
640                return -1;
641        }
642        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
643                        FORMAT_DATA_OUT->dagstream) < 0) {
644                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
645                return -1;
646        }
647        FORMAT_DATA_OUT->stream_attached = 0;
648        return 0;
649}
650
651/* Pauses a DAG input trace */
652static int dag_pause_input(libtrace_t *libtrace) {
653
654        /* Stop and detach the stream */
655        if (dag_stop_stream(FORMAT_DATA->device->fd,
656                                FORMAT_DATA->dagstream) < 0) {
657                trace_set_err(libtrace, errno, "Could not stop DAG stream");
658                return -1;
659        }
660        if (dag_detach_stream(FORMAT_DATA->device->fd,
661                                FORMAT_DATA->dagstream) < 0) {
662                trace_set_err(libtrace, errno, "Could not detach DAG stream");
663                return -1;
664        }
665        FORMAT_DATA->stream_attached = 0;
666        return 0;
667}
668
669/* Closes a DAG input trace */
670static int dag_fin_input(libtrace_t *libtrace) {
671        /* Need the lock, since we're going to be handling the device list */
672        pthread_mutex_lock(&open_dag_mutex);
673       
674        /* Detach the stream if we are not paused */
675        if (FORMAT_DATA->stream_attached)
676                dag_pause_input(libtrace);
677        FORMAT_DATA->device->ref_count --;
678
679        /* Close the DAG device if there are no more references to it */
680        if (FORMAT_DATA->device->ref_count == 0)
681                dag_close_device(FORMAT_DATA->device);
682        if (DUCK.dummy_duck)
683                trace_destroy_dead(DUCK.dummy_duck);
684        free(libtrace->format_data);
685        pthread_mutex_unlock(&open_dag_mutex);
686        return 0; /* success */
687}
688
689/* Closes a DAG output trace */
690static int dag_fin_output(libtrace_out_t *libtrace) {
691       
692        /* Commit any outstanding traffic in the txbuffer */
693        if (FORMAT_DATA_OUT->waiting) {
694                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
695                                FORMAT_DATA_OUT->waiting );
696        }
697
698        /* Wait until the buffer is nearly clear before exiting the program,
699         * as we will lose packets otherwise */
700        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
701                        FORMAT_DATA_OUT->dagstream,
702                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
703                                        FORMAT_DATA_OUT->dagstream) - 8
704                        );
705
706        /* Need the lock, since we're going to be handling the device list */
707        pthread_mutex_lock(&open_dag_mutex);
708
709        /* Detach the stream if we are not paused */
710        if (FORMAT_DATA_OUT->stream_attached)
711                dag_pause_output(libtrace);
712        FORMAT_DATA_OUT->device->ref_count --;
713
714        /* Close the DAG device if there are no more references to it */
715        if (FORMAT_DATA_OUT->device->ref_count == 0)
716                dag_close_device(FORMAT_DATA_OUT->device);
717        free(libtrace->format_data);
718        pthread_mutex_unlock(&open_dag_mutex);
719        return 0; /* success */
720}
721
722/* Extracts DUCK information from the DAG card and produces a DUCK packet */
723static int dag_get_duckinfo(libtrace_t *libtrace,
724                                libtrace_packet_t *packet) {
725
726        /* Allocate memory for the DUCK data */
727        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
728                        !packet->buffer) {
729                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
730                packet->buf_control = TRACE_CTRL_PACKET;
731                if (!packet->buffer) {
732                        trace_set_err(libtrace, errno,
733                                        "Cannot allocate packet buffer");
734                        return -1;
735                }
736        }
737
738        /* DUCK doesn't have a format header */
739        packet->header = 0;
740        packet->payload = packet->buffer;
741
742        /* No need to check if we can get DUCK or not - we're modern
743         * enough so just grab the DUCK info */
744        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
745                                        (duckinf_t *)packet->payload) < 0)) {
746                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
747                return -1;
748        }
749
750        packet->type = TRACE_RT_DUCK_2_5;
751
752        /* Set the packet's tracce to point at a DUCK trace, so that the
753         * DUCK format functions will be called on the packet rather than the
754         * DAG ones */
755        if (!DUCK.dummy_duck)
756                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
757        packet->trace = DUCK.dummy_duck;
758        return sizeof(duckinf_t);
759}
760
761/* Determines the amount of data available to read from the DAG card */
762static int dag_available(libtrace_t *libtrace) {
763        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
764
765        /* If we've processed more than 4MB of data since we last called
766         * dag_advance_stream, then we should call it again to allow the
767         * space occupied by that 4MB to be released */
768        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
769                return diff;
770       
771        /* Update the top and bottom pointers */
772        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
773                        FORMAT_DATA->dagstream,
774                        &(FORMAT_DATA->bottom));
775       
776        if (FORMAT_DATA->top == NULL) {
777                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
778                return -1;
779        }
780        FORMAT_DATA->processed = 0;
781        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
782        return diff;
783}
784
785/* Returns a pointer to the start of the next complete ERF record */
786static dag_record_t *dag_get_record(libtrace_t *libtrace) {
787        dag_record_t *erfptr = NULL;
788        uint16_t size;
789        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
790        if (!erfptr)
791                return NULL;
792        size = ntohs(erfptr->rlen);
793        assert( size >= dag_record_size );
794        /* Make certain we have the full packet available */
795        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
796                return NULL;
797        FORMAT_DATA->bottom += size;
798        FORMAT_DATA->processed += size;
799        return erfptr;
800}
801
802/* Converts a buffer containing a recently read DAG packet record into a
803 * libtrace packet */
804static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
805                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
806
807        dag_record_t *erfptr;
808       
809        /* If the packet previously owned a buffer that is not the buffer
810         * that contains the new packet data, we're going to need to free the
811         * old one to avoid memory leaks */
812        if (packet->buffer != buffer &&
813                        packet->buf_control == TRACE_CTRL_PACKET) {
814                free(packet->buffer);
815        }
816
817        /* Set the buffer owner appropriately */
818        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
819                packet->buf_control = TRACE_CTRL_PACKET;
820        } else
821                packet->buf_control = TRACE_CTRL_EXTERNAL;
822
823        /* Update the packet pointers and type appropriately */
824        erfptr = (dag_record_t *)buffer;
825        packet->buffer = erfptr;
826        packet->header = erfptr;
827        packet->type = rt_type;
828
829        if (erfptr->flags.rxerror == 1) {
830                /* rxerror means the payload is corrupt - drop the payload
831                 * by tweaking rlen */
832                packet->payload = NULL;
833                erfptr->rlen = htons(erf_get_framing_length(packet));
834        } else {
835                packet->payload = (char*)packet->buffer
836                        + erf_get_framing_length(packet);
837        }
838
839        if (libtrace->format_data == NULL) {
840                dag_init_format_data(libtrace);
841        }
842
843        /* Update the dropped packets counter */
844
845        /* No loss counter for DSM coloured records - have to use
846         * some other API */
847        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
848                /* TODO */
849        } else {
850                /* Use the ERF loss counter */
851                if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
852                        FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
853                } else {
854                        FORMAT_DATA->drops += ntohs(erfptr->lctr);
855                }
856        }
857
858        return 0;
859}
860
861/*
862 * dag_write_packet() at this stage attempts to improve tx performance
863 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
864 * has been met. I observed approximately 270% performance increase
865 * through this relatively naive tweak. No optimisation of buffer sizes
866 * was attempted.
867 */
868
869/* Pushes an ERF record onto the transmit stream */
870static int dag_dump_packet(libtrace_out_t *libtrace,
871                dag_record_t *erfptr, unsigned int pad, void *buffer) {
872        int size;
873
874        /*
875         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
876         * requested any space yet, and request some, storing the pointer at
877         * FORMAT_DATA_OUT->txbuffer.
878         *
879         * The amount to request is slightly magical at the moment - it's
880         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
881         * the buffer and handle overruns.
882         */
883        if (FORMAT_DATA_OUT->waiting == 0) {
884                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
885                                FORMAT_DATA_OUT->dagstream, 16908288);
886        }
887
888        /*
889         * Copy the header separately to the body, as we can't guarantee they
890         * are in contiguous memory
891         */
892        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
893        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
894
895
896
897        /*
898         * Copy our incoming packet into the outgoing buffer, and increment
899         * our waiting count
900         */
901        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
902        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
903        FORMAT_DATA_OUT->waiting += size;
904
905        /*
906         * If our output buffer has more than 16 Mebibytes in it, commit those
907         * bytes and reset the waiting count to 0.
908         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
909         * case there is still data in the buffer at program exit.
910         */
911
912        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
913                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
914                        FORMAT_DATA_OUT->waiting );
915                FORMAT_DATA_OUT->waiting = 0;
916        }
917
918        return size + pad + dag_record_size;
919
920}
921
922/* Attempts to determine a suitable ERF type for a given packet. Returns true
923 * if one is found, false otherwise */
924static bool find_compatible_linktype(libtrace_out_t *libtrace,
925                                libtrace_packet_t *packet, char *type)
926{
927         // Keep trying to simplify the packet until we can find
928         //something we can do with it
929
930        do {
931                *type=libtrace_to_erf_type(trace_get_link_type(packet));
932
933                // Success
934                if (*type != (char)-1)
935                        return true;
936
937                if (!demote_packet(packet)) {
938                        trace_set_err_out(libtrace,
939                                        TRACE_ERR_NO_CONVERSION,
940                                        "No erf type for packet (%i)",
941                                        trace_get_link_type(packet));
942                        return false;
943                }
944
945        } while(1);
946
947        return true;
948}
949
950/* Writes a packet to the provided DAG output trace */
951static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
952        /*
953         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
954         * sucks, sorry about that.
955         */
956        unsigned int pad = 0;
957        int numbytes;
958        void *payload = packet->payload;
959        dag_record_t *header = (dag_record_t *)packet->header;
960        char erf_type = 0;
961
962        if(!packet->header) {
963                /* No header, probably an RT packet. Lifted from
964                 * erf_write_packet(). */
965                return -1;
966        }
967
968        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
969                return 0;
970
971        pad = dag_get_padding(packet);
972
973        /*
974         * If the payload is null, adjust the rlen. Discussion of this is
975         * attached to erf_write_packet()
976         */
977        if (payload == NULL) {
978                header->rlen = htons(dag_record_size + pad);
979        }
980
981        if (packet->type == TRACE_RT_DATA_ERF) {
982                numbytes = dag_dump_packet(libtrace,
983                                header,
984                                pad,
985                                payload
986                                );
987
988        } else {
989                /* Build up a new packet header from the existing header */
990
991                /* Simplify the packet first - if we can't do this, break
992                 * early */
993                if (!find_compatible_linktype(libtrace,packet,&erf_type))
994                        return -1;
995
996                dag_record_t erfhdr;
997
998                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
999                payload=packet->payload;
1000                pad = dag_get_padding(packet);
1001
1002                /* Flags. Can't do this */
1003                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1004                if (trace_get_direction(packet)!=(int)~0U)
1005                        erfhdr.flags.iface = trace_get_direction(packet);
1006
1007                erfhdr.type = erf_type;
1008
1009                /* Packet length (rlen includes format overhead) */
1010                assert(trace_get_capture_length(packet)>0
1011                                && trace_get_capture_length(packet)<=65536);
1012                assert(erf_get_framing_length(packet)>0
1013                                && trace_get_framing_length(packet)<=65536);
1014                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1015                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1016
1017                erfhdr.rlen = htons(trace_get_capture_length(packet)
1018                        + erf_get_framing_length(packet));
1019
1020
1021                /* Loss counter. Can't do this */
1022                erfhdr.lctr = 0;
1023                /* Wire length, does not include padding! */
1024                erfhdr.wlen = htons(trace_get_wire_length(packet));
1025
1026                /* Write it out */
1027                numbytes = dag_dump_packet(libtrace,
1028                                &erfhdr,
1029                                pad,
1030                                payload);
1031
1032        }
1033
1034        return numbytes;
1035}
1036
1037/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1038 *
1039 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1040 */
1041static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1042        int size = 0;
1043        struct timeval tv;
1044        dag_record_t *erfptr = NULL;
1045        int numbytes = 0;
1046        uint32_t flags = 0;
1047        struct timeval maxwait;
1048        struct timeval pollwait;
1049
1050        pollwait.tv_sec = 0;
1051        pollwait.tv_usec = 10000;
1052        maxwait.tv_sec = 0;
1053        maxwait.tv_usec = 250000;
1054
1055        /* Check if we're due for a DUCK report */
1056        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
1057                        DUCK.duck_freq != 0) {
1058                size = dag_get_duckinfo(libtrace, packet);
1059                DUCK.last_duck = DUCK.last_pkt;
1060                if (size != 0) {
1061                        return size;
1062                }
1063                /* No DUCK support, so don't waste our time anymore */
1064                DUCK.duck_freq = 0;
1065        }
1066
1067        /* Don't let anyone try to free our DAG memory hole! */
1068        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1069
1070        /* If the packet buffer is currently owned by libtrace, free it so
1071         * that we can set the packet to point into the DAG memory hole */
1072        if (packet->buf_control == TRACE_CTRL_PACKET) {
1073                free(packet->buffer);
1074                packet->buffer = 0;
1075        }
1076       
1077        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1078                        FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 
1079                        &pollwait) == -1)
1080        {
1081                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1082                return -1;
1083        }
1084
1085
1086        /* Grab a full ERF record */
1087        do {
1088                numbytes = dag_available(libtrace);
1089                if (numbytes < 0)
1090                        return numbytes;
1091                if (numbytes < dag_record_size) {
1092                        if (libtrace_halt)
1093                                return 0;
1094                        /* Block until we see a packet */
1095                        continue;
1096                }
1097                erfptr = dag_get_record(libtrace);
1098        } while (erfptr == NULL);
1099
1100        /* Prepare the libtrace packet */
1101        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1102                                flags))
1103                return -1;
1104
1105        /* Update the DUCK timer */
1106        tv = trace_get_timeval(packet);
1107        DUCK.last_pkt = tv.tv_sec;
1108
1109        return packet->payload ? htons(erfptr->rlen) :
1110                                erf_get_framing_length(packet);
1111}
1112
1113/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1114 * packet is available, we will return a packet event. Otherwise we will
1115 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1116 */
1117static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1118                                        libtrace_packet_t *packet) {
1119        libtrace_eventobj_t event = {0,0,0.0,0};
1120        dag_record_t *erfptr = NULL;
1121        int numbytes;
1122        uint32_t flags = 0;
1123        struct timeval minwait;
1124       
1125        minwait.tv_sec = 0;
1126        minwait.tv_usec = 10000;
1127       
1128        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1129                        FORMAT_DATA->dagstream, 0, &minwait, 
1130                        &minwait) == -1)
1131        {
1132                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1133                event.type = TRACE_EVENT_TERMINATE;
1134                return event;
1135        }
1136
1137        do {
1138                erfptr = NULL;
1139                numbytes = 0;
1140       
1141                /* Need to call dag_available so that the top pointer will get
1142                 * updated, otherwise we'll never see any data! */
1143                numbytes = dag_available(libtrace);
1144
1145                /* May as well not bother calling dag_get_record if
1146                 * dag_available suggests that there's no data */
1147                if (numbytes != 0)
1148                        erfptr = dag_get_record(libtrace);
1149                if (erfptr == NULL) {
1150                        /* No packet available - sleep for a very short time */
1151                        if (libtrace_halt) {
1152                                event.type = TRACE_EVENT_TERMINATE;
1153                        } else {                       
1154                                event.type = TRACE_EVENT_SLEEP;
1155                                event.seconds = 0.0001;
1156                        }
1157                        break;
1158                }
1159                if (dag_prepare_packet(libtrace, packet, erfptr, 
1160                                        TRACE_RT_DATA_ERF, flags)) {
1161                        event.type = TRACE_EVENT_TERMINATE;
1162                        break;
1163                }
1164
1165
1166                event.size = trace_get_capture_length(packet) + 
1167                                trace_get_framing_length(packet);
1168               
1169                /* XXX trace_read_packet() normally applies the following
1170                 * config options for us, but this function is called via
1171                 * trace_event() so we have to do it ourselves */
1172
1173                if (libtrace->filter) {
1174                        int filtret = trace_apply_filter(libtrace->filter, 
1175                                        packet);
1176                        if (filtret == -1) {
1177                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1178                                                "Bad BPF Filter");
1179                                event.type = TRACE_EVENT_TERMINATE;
1180                                break;
1181                        }
1182
1183                        if (filtret == 0) {
1184                                /* This packet isn't useful so we want to
1185                                 * immediately see if there is another suitable
1186                                 * one - we definitely DO NOT want to return
1187                                 * a sleep event in this case, like we used to
1188                                 * do! */
1189                                libtrace->filtered_packets ++;
1190                                trace_clear_cache(packet);
1191                                continue;
1192                        }
1193                               
1194                        event.type = TRACE_EVENT_PACKET;
1195                } else {
1196                        event.type = TRACE_EVENT_PACKET;
1197                }
1198
1199                if (libtrace->snaplen > 0) {
1200                        trace_set_capture_length(packet, libtrace->snaplen);
1201                }
1202                libtrace->accepted_packets ++;
1203                break;
1204        } while (1);
1205
1206        return event;
1207}
1208
1209/* Gets the number of dropped packets */
1210static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1211        if (trace->format_data == NULL)
1212                return (uint64_t)-1;
1213        return DATA(trace)->drops;
1214}
1215
1216/* Prints some semi-useful help text about the DAG format module */
1217static void dag_help(void) {
1218        printf("dag format module: $Revision: 1755 $\n");
1219        printf("Supported input URIs:\n");
1220        printf("\tdag:/dev/dagn\n");
1221        printf("\n");
1222        printf("\te.g.: dag:/dev/dag0\n");
1223        printf("\n");
1224        printf("Supported output URIs:\n");
1225        printf("\tnone\n");
1226        printf("\n");
1227}
1228
1229static struct libtrace_format_t dag = {
1230        "dag",
1231        "$Id$",
1232        TRACE_FORMAT_ERF,
1233        dag_probe_filename,             /* probe filename */
1234        NULL,                           /* probe magic */
1235        dag_init_input,                 /* init_input */
1236        dag_config_input,               /* config_input */
1237        dag_start_input,                /* start_input */
1238        dag_pause_input,                /* pause_input */
1239        dag_init_output,                /* init_output */
1240        NULL,                           /* config_output */
1241        dag_start_output,               /* start_output */
1242        dag_fin_input,                  /* fin_input */
1243        dag_fin_output,                 /* fin_output */
1244        dag_read_packet,                /* read_packet */
1245        dag_prepare_packet,             /* prepare_packet */
1246        NULL,                           /* fin_packet */
1247        dag_write_packet,               /* write_packet */
1248        erf_get_link_type,              /* get_link_type */
1249        erf_get_direction,              /* get_direction */
1250        erf_set_direction,              /* set_direction */
1251        erf_get_erf_timestamp,          /* get_erf_timestamp */
1252        NULL,                           /* get_timeval */
1253        NULL,                           /* get_seconds */
1254        NULL,                           /* get_timespec */
1255        NULL,                           /* seek_erf */
1256        NULL,                           /* seek_timeval */
1257        NULL,                           /* seek_seconds */
1258        erf_get_capture_length,         /* get_capture_length */
1259        erf_get_wire_length,            /* get_wire_length */
1260        erf_get_framing_length,         /* get_framing_length */
1261        erf_set_capture_length,         /* set_capture_length */
1262        NULL,                           /* get_received_packets */
1263        NULL,                           /* get_filtered_packets */
1264        dag_get_dropped_packets,        /* get_dropped_packets */
1265        NULL,                           /* get_captured_packets */
1266        NULL,                           /* get_fd */
1267        trace_event_dag,                /* trace_event */
1268        dag_help,                       /* help */
1269        NULL                            /* next pointer */
1270};
1271
1272void dag_constructor(void) {
1273        register_format(&dag);
1274}
Note: See TracBrowser for help on using the repository browser.