source: lib/format_dag25.c @ 0054c50

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 0054c50 was 0054c50, checked in by Shane Alcock <salcock@…>, 7 years ago

Don't free the DAG device name twice

  • Property mode set to 100644
File size: 39.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* String containing the DAG device name */
107        char *device_name;
108        /* The DAG device being used for writing */
109        struct dag_dev_t *device;
110        /* The DAG stream that is being written on */
111        unsigned int dagstream;
112        /* Boolean flag indicating whether the stream is currently attached */
113        int stream_attached;
114        /* The amount of data waiting to be transmitted, in bytes */
115        uint64_t waiting;
116        /* A buffer to hold the data to be transmittted */
117        uint8_t *txbuffer;
118};
119
120/* "Global" data that is stored for each DAG input trace */
121struct dag_format_data_t {
122
123        /* Data required for regular DUCK reporting */
124        struct {
125                /* Timestamp of the last DUCK report */
126                uint32_t last_duck;
127                /* The number of seconds between each DUCK report */
128                uint32_t duck_freq;
129                /* Timestamp of the last packet read from the DAG card */
130                uint32_t last_pkt;
131                /* Dummy trace to ensure DUCK packets are dealt with using the
132                 * DUCK format functions */
133                libtrace_t *dummy_duck;
134        } duck;
135
136        /* String containing the DAG device name */
137        char *device_name;
138        /* The DAG device that we are reading from */
139        struct dag_dev_t *device;
140        /* The DAG stream that we are reading from */
141        unsigned int dagstream;
142        /* Boolean flag indicating whether the stream is currently attached */
143        int stream_attached;
144        /* Pointer to the first unread byte in the DAG memory hole */
145        uint8_t *bottom;
146        /* Pointer to the last unread byte in the DAG memory hole */
147        uint8_t *top;
148        /* The amount of data processed thus far from the bottom pointer */
149        uint32_t processed;
150        /* The number of packets that have been dropped */
151        uint64_t drops;
152
153        uint8_t seeninterface[4];
154};
155
156/* To be thread-safe, we're going to need a mutex for operating on the list
157 * of DAG devices */
158pthread_mutex_t open_dag_mutex;
159
160/* The list of DAG devices that have been opened by libtrace.
161 *
162 * We can only open each DAG device once, but we might want to read from
163 * multiple streams. Therefore, we need to maintain a list of devices that we
164 * have opened (with ref counts!) so that we don't try to open a device too
165 * many times or close a device that we're still using */
166struct dag_dev_t *open_dags = NULL;
167
168/* Returns the amount of padding between the ERF header and the start of the
169 * captured packet data */
170static int dag_get_padding(const libtrace_packet_t *packet)
171{
172        /* ERF Ethernet records have a 2 byte padding before the packet itself
173         * so that the IP header is aligned on a 32 bit boundary.
174         */
175        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
176                dag_record_t *erfptr = (dag_record_t *)packet->header;
177                switch(erfptr->type) {
178                        case TYPE_ETH:
179                        case TYPE_DSM_COLOR_ETH:
180                                return 2;
181                        default:                return 0;
182                }
183        }
184        else {
185                switch(trace_get_link_type(packet)) {
186                        case TRACE_TYPE_ETH:    return 2;
187                        default:                return 0;
188                }
189        }
190}
191
192/* Attempts to determine if the given filename refers to a DAG device */
193static int dag_probe_filename(const char *filename)
194{
195        struct stat statbuf;
196        /* Can we stat the file? */
197        if (stat(filename, &statbuf) != 0) {
198                return 0;
199        }
200        /* Is it a character device? */
201        if (!S_ISCHR(statbuf.st_mode)) {
202                return 0;
203        }
204        /* Yeah, it's probably us. */
205        return 1;
206}
207
208/* Initialises the DAG output data structure */
209static void dag_init_format_out_data(libtrace_out_t *libtrace) {
210        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
211        // no DUCK on output
212        FORMAT_DATA_OUT->stream_attached = 0;
213        FORMAT_DATA_OUT->device = NULL;
214        FORMAT_DATA_OUT->device_name = NULL;
215        FORMAT_DATA_OUT->dagstream = 0;
216        FORMAT_DATA_OUT->waiting = 0;
217
218}
219
220/* Initialises the DAG input data structure */
221static void dag_init_format_data(libtrace_t *libtrace) {
222        libtrace->format_data = (struct dag_format_data_t *)
223                malloc(sizeof(struct dag_format_data_t));
224        DUCK.last_duck = 0;
225        DUCK.duck_freq = 0;
226        DUCK.last_pkt = 0;
227        DUCK.dummy_duck = NULL;
228        FORMAT_DATA->stream_attached = 0;
229        FORMAT_DATA->drops = 0;
230        FORMAT_DATA->device_name = NULL;
231        FORMAT_DATA->device = NULL;
232        FORMAT_DATA->dagstream = 0;
233        FORMAT_DATA->processed = 0;
234        FORMAT_DATA->bottom = NULL;
235        FORMAT_DATA->top = NULL;
236        memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
237}
238
239/* Determines if there is already an entry for the given DAG device in the
240 * device list and increments the reference count for that device, if found.
241 *
242 * NOTE: This function assumes the open_dag_mutex is held by the caller */
243static struct dag_dev_t *dag_find_open_device(char *dev_name) {
244        struct dag_dev_t *dag_dev;
245
246        dag_dev = open_dags;
247
248        /* XXX: Not exactly zippy, but how often are we going to be dealing
249         * with multiple dag cards? */
250        while (dag_dev != NULL) {
251                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
252                        dag_dev->ref_count ++;
253                        return dag_dev;
254
255                }
256                dag_dev = dag_dev->next;
257        }
258        return NULL;
259
260
261}
262
263/* Closes a DAG device and removes it from the device list.
264 *
265 * Attempting to close a DAG device that has a non-zero reference count will
266 * cause an assertion failure!
267 *
268 * NOTE: This function assumes the open_dag_mutex is held by the caller */
269static void dag_close_device(struct dag_dev_t *dev) {
270        /* Need to remove from the device list */
271
272        assert(dev->ref_count == 0);
273
274        if (dev->prev == NULL) {
275                open_dags = dev->next;
276                if (dev->next)
277                        dev->next->prev = NULL;
278        } else {
279                dev->prev->next = dev->next;
280                if (dev->next)
281                        dev->next->prev = dev->prev;
282        }
283
284        dag_close(dev->fd);
285        free(dev);
286}
287
288
289/* Opens a new DAG device for writing and adds it to the DAG device list
290 *
291 * NOTE: this function should only be called when opening a DAG device for
292 * writing - there is little practical difference between this and the
293 * function below that covers the reading case, but we need the output trace
294 * object to report errors properly so the two functions take slightly
295 * different arguments. This is really lame and there should be a much better
296 * way of doing this.
297 *
298 * NOTE: This function assumes the open_dag_mutex is held by the caller
299 */
300static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
301        struct stat buf;
302        int fd;
303        struct dag_dev_t *new_dev;
304
305        /* Make sure the device exists */
306        if (stat(dev_name, &buf) == -1) {
307                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
308                return NULL;
309}
310
311        /* Make sure it is the appropriate type of device */
312        if (S_ISCHR(buf.st_mode)) {
313                /* Try opening the DAG device */
314                if((fd = dag_open(dev_name)) < 0) {
315                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
316                                        dev_name);
317                        return NULL;
318                }
319        } else {
320                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
321                                dev_name);
322                return NULL;
323        }
324
325        /* Add the device to our device list - it is just a doubly linked
326         * list with no inherent ordering; just tack the new one on the front
327         */
328        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
329        new_dev->fd = fd;
330        new_dev->dev_name = dev_name;
331        new_dev->ref_count = 1;
332
333        new_dev->prev = NULL;
334        new_dev->next = open_dags;
335        if (open_dags)
336                open_dags->prev = new_dev;
337
338        open_dags = new_dev;
339
340        return new_dev;
341}
342
343/* Opens a new DAG device for reading and adds it to the DAG device list
344 *
345 * NOTE: this function should only be called when opening a DAG device for
346 * reading - there is little practical difference between this and the
347 * function above that covers the writing case, but we need the input trace
348 * object to report errors properly so the two functions take slightly
349 * different arguments. This is really lame and there should be a much better
350 * way of doing this.
351 *
352 * NOTE: This function assumes the open_dag_mutex is held by the caller */
353static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
354        struct stat buf;
355        int fd;
356        struct dag_dev_t *new_dev;
357
358        /* Make sure the device exists */
359        if (stat(dev_name, &buf) == -1) {
360                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
361                return NULL;
362        }
363
364        /* Make sure it is the appropriate type of device */
365        if (S_ISCHR(buf.st_mode)) {
366                /* Try opening the DAG device */
367                if((fd = dag_open(dev_name)) < 0) {
368                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
369                                        dev_name);
370                        return NULL;
371                }
372        } else {
373                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
374                                dev_name);
375                return NULL;
376        }
377
378        /* Add the device to our device list - it is just a doubly linked
379         * list with no inherent ordering; just tack the new one on the front
380         */
381        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
382        new_dev->fd = fd;
383        new_dev->dev_name = dev_name;
384        new_dev->ref_count = 1;
385
386        new_dev->prev = NULL;
387        new_dev->next = open_dags;
388        if (open_dags)
389                open_dags->prev = new_dev;
390
391        open_dags = new_dev;
392
393        return new_dev;
394}
395
396/* Creates and initialises a DAG output trace */
397static int dag_init_output(libtrace_out_t *libtrace) {
398        char *scan = NULL;
399        struct dag_dev_t *dag_device = NULL;
400        int stream = 1;
401       
402        /* XXX I don't know if this is important or not, but this function
403         * isn't present in all of the driver releases that this code is
404         * supposed to support! */
405        /*
406        unsigned long wake_time;
407        dagutil_sleep_get_wake_time(&wake_time,0);
408        */
409
410        dag_init_format_out_data(libtrace);
411        /* Grab the mutex while we're likely to be messing with the device
412         * list */
413        pthread_mutex_lock(&open_dag_mutex);
414       
415        /* Specific streams are signified using a comma in the libtrace URI,
416         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
417         *
418         * If no stream is specified, we will write using stream 1 */
419        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
420                FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);
421        } else {
422                FORMAT_DATA_OUT->device_name = 
423                                (char *)strndup(libtrace->uridata,
424                                (size_t)(scan - libtrace->uridata));
425                stream = atoi(++scan);
426        }
427        FORMAT_DATA_OUT->dagstream = stream;
428
429        /* See if our DAG device is already open */
430        dag_device = dag_find_open_device(FORMAT_DATA_OUT->device_name);
431
432        if (dag_device == NULL) {
433                /* Device not yet opened - open it ourselves */
434                dag_device = dag_open_output_device(libtrace, 
435                                FORMAT_DATA_OUT->device_name);
436        }
437
438        /* Make sure we have successfully opened a DAG device */
439        if (dag_device == NULL) {
440                if (FORMAT_DATA_OUT->device_name) {
441                        free(FORMAT_DATA_OUT->device_name);
442                        FORMAT_DATA_OUT->device_name = NULL;
443                }
444                pthread_mutex_unlock(&open_dag_mutex);
445                return -1;
446        }
447
448        FORMAT_DATA_OUT->device = dag_device;
449        pthread_mutex_unlock(&open_dag_mutex);
450        return 0;
451}
452
453/* Creates and initialises a DAG input trace */
454static int dag_init_input(libtrace_t *libtrace) {
455        char *scan = NULL;
456        int stream = 0;
457        struct dag_dev_t *dag_device = NULL;
458
459        dag_init_format_data(libtrace);
460        /* Grab the mutex while we're likely to be messing with the device
461         * list */
462        pthread_mutex_lock(&open_dag_mutex);
463       
464       
465        /* Specific streams are signified using a comma in the libtrace URI,
466         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
467         *
468         * If no stream is specified, we will read from stream 0 */
469        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
470                FORMAT_DATA->device_name = strdup(libtrace->uridata);
471        } else {
472                FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,
473                                (size_t)(scan - libtrace->uridata));
474                stream = atoi(++scan);
475        }
476
477        FORMAT_DATA->dagstream = stream;
478
479        /* See if our DAG device is already open */
480        dag_device = dag_find_open_device(FORMAT_DATA->device_name);
481
482        if (dag_device == NULL) {
483                /* Device not yet opened - open it ourselves */
484                dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name);
485        }
486
487        /* Make sure we have successfully opened a DAG device */
488        if (dag_device == NULL) {
489                if (FORMAT_DATA->device_name)
490                        free(FORMAT_DATA->device_name);
491                FORMAT_DATA->device_name = NULL;
492                pthread_mutex_unlock(&open_dag_mutex);
493                return -1;
494        }
495
496        FORMAT_DATA->device = dag_device;
497
498        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
499        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
500 
501        *  The symptom of the port being disabled is that libtrace will appear to hang.
502        */
503        /* Check kBooleanAttributeFault is false */
504        /* Check kBooleanAttributeLocalFault is false */
505        /* Check kBooleanAttributeLock is true ? */
506        /* Check kBooleanAttributePeerLink ? */
507
508        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
509        /* Set kUint32AttributeSnapLength to the snaplength */
510
511        pthread_mutex_unlock(&open_dag_mutex);
512        return 0;
513}
514
515/* Configures a DAG input trace */
516static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
517                                void *data) {
518        char conf_str[4096];
519        switch(option) {
520                case TRACE_OPTION_META_FREQ:
521                        /* This option is used to specify the frequency of DUCK
522                         * updates */
523                        DUCK.duck_freq = *(int *)data;
524                        return 0;
525                case TRACE_OPTION_SNAPLEN:
526                        /* Tell the card our new snap length */
527                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
528                        if (dag_configure(FORMAT_DATA->device->fd,
529                                                conf_str) != 0) {
530                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
531                                return -1;
532                        }
533                        return 0;
534                case TRACE_OPTION_PROMISC:
535                        /* DAG already operates in a promisc fashion */
536                        return -1;
537                case TRACE_OPTION_FILTER:
538                        /* We don't yet support pushing filters into DAG
539                         * cards */
540                        return -1;
541                case TRACE_OPTION_EVENT_REALTIME:
542                        /* Live capture is always going to be realtime */
543                        return -1;
544        }
545        return -1;
546}
547
548/* Starts a DAG output trace */
549static int dag_start_output(libtrace_out_t *libtrace) {
550        struct timeval zero, nopoll;
551
552        zero.tv_sec = 0;
553        zero.tv_usec = 0;
554        nopoll = zero;
555
556        /* Attach and start the DAG stream */
557
558        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
559                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
560                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
561                return -1;
562        }
563
564        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
565                        FORMAT_DATA_OUT->dagstream) < 0) {
566                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
567                return -1;
568        }
569        FORMAT_DATA_OUT->stream_attached = 1;
570
571        /* We don't want the dag card to do any sleeping */
572
573        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
574                        FORMAT_DATA_OUT->dagstream, 0, &zero,
575                        &nopoll);
576
577        return 0;
578}
579
580/* Starts a DAG input trace */
581static int dag_start_input(libtrace_t *libtrace) {
582        struct timeval zero, nopoll;
583        uint8_t *top, *bottom, *starttop;
584        top = bottom = NULL;
585
586        zero.tv_sec = 0;
587        zero.tv_usec = 10000;
588        nopoll = zero;
589
590        /* Attach and start the DAG stream */
591        if (dag_attach_stream(FORMAT_DATA->device->fd,
592                                FORMAT_DATA->dagstream, 0, 0) < 0) {
593                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
594                return -1;
595        }
596
597        if (dag_start_stream(FORMAT_DATA->device->fd,
598                                FORMAT_DATA->dagstream) < 0) {
599                trace_set_err(libtrace, errno, "Cannot start DAG stream");
600                return -1;
601        }
602        FORMAT_DATA->stream_attached = 1;
603       
604        /* We don't want the dag card to do any sleeping */
605        dag_set_stream_poll(FORMAT_DATA->device->fd,
606                                FORMAT_DATA->dagstream, 0, &zero,
607                                &nopoll);
608
609        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
610                                        FORMAT_DATA->dagstream,
611                                        &bottom);
612
613        /* Should probably flush the memory hole now */
614        top = starttop;
615        while (starttop - bottom > 0) {
616                bottom += (starttop - bottom);
617                top = dag_advance_stream(FORMAT_DATA->device->fd,
618                                        FORMAT_DATA->dagstream,
619                                        &bottom);
620        }
621        FORMAT_DATA->top = top;
622        FORMAT_DATA->bottom = bottom;
623        FORMAT_DATA->processed = 0;
624        FORMAT_DATA->drops = 0;
625
626        return 0;
627}
628
629/* Pauses a DAG output trace */
630static int dag_pause_output(libtrace_out_t *libtrace) {
631
632        /* Stop and detach the stream */
633        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
634                        FORMAT_DATA_OUT->dagstream) < 0) {
635                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
636                return -1;
637        }
638        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
639                        FORMAT_DATA_OUT->dagstream) < 0) {
640                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
641                return -1;
642        }
643        FORMAT_DATA_OUT->stream_attached = 0;
644        return 0;
645}
646
647/* Pauses a DAG input trace */
648static int dag_pause_input(libtrace_t *libtrace) {
649
650        /* Stop and detach the stream */
651        if (dag_stop_stream(FORMAT_DATA->device->fd,
652                                FORMAT_DATA->dagstream) < 0) {
653                trace_set_err(libtrace, errno, "Could not stop DAG stream");
654                return -1;
655        }
656        if (dag_detach_stream(FORMAT_DATA->device->fd,
657                                FORMAT_DATA->dagstream) < 0) {
658                trace_set_err(libtrace, errno, "Could not detach DAG stream");
659                return -1;
660        }
661        FORMAT_DATA->stream_attached = 0;
662        return 0;
663}
664
665/* Closes a DAG input trace */
666static int dag_fin_input(libtrace_t *libtrace) {
667        /* Need the lock, since we're going to be handling the device list */
668        pthread_mutex_lock(&open_dag_mutex);
669       
670        /* Detach the stream if we are not paused */
671        if (FORMAT_DATA->stream_attached)
672                dag_pause_input(libtrace);
673        FORMAT_DATA->device->ref_count --;
674
675        /* Close the DAG device if there are no more references to it */
676        if (FORMAT_DATA->device->ref_count == 0)
677                dag_close_device(FORMAT_DATA->device);
678        if (DUCK.dummy_duck)
679                trace_destroy_dead(DUCK.dummy_duck);
680        if (FORMAT_DATA->device_name)
681                free(FORMAT_DATA->device_name);
682        free(libtrace->format_data);
683        pthread_mutex_unlock(&open_dag_mutex);
684        return 0; /* success */
685}
686
687/* Closes a DAG output trace */
688static int dag_fin_output(libtrace_out_t *libtrace) {
689       
690        /* Commit any outstanding traffic in the txbuffer */
691        if (FORMAT_DATA_OUT->waiting) {
692                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
693                                FORMAT_DATA_OUT->waiting );
694        }
695
696        /* Wait until the buffer is nearly clear before exiting the program,
697         * as we will lose packets otherwise */
698        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
699                        FORMAT_DATA_OUT->dagstream,
700                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
701                                        FORMAT_DATA_OUT->dagstream) - 8
702                        );
703
704        /* Need the lock, since we're going to be handling the device list */
705        pthread_mutex_lock(&open_dag_mutex);
706
707        /* Detach the stream if we are not paused */
708        if (FORMAT_DATA_OUT->stream_attached)
709                dag_pause_output(libtrace);
710        FORMAT_DATA_OUT->device->ref_count --;
711
712        /* Close the DAG device if there are no more references to it */
713        if (FORMAT_DATA_OUT->device->ref_count == 0)
714                dag_close_device(FORMAT_DATA_OUT->device);
715        if (FORMAT_DATA_OUT->device_name)
716                free(FORMAT_DATA_OUT->device_name);
717        free(libtrace->format_data);
718        pthread_mutex_unlock(&open_dag_mutex);
719        return 0; /* success */
720}
721
722#ifdef DAGIOC_CARD_DUCK
723#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
724#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
725#else
726#ifdef DAGIOCDUCK
727#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
728#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
729#else
730#warning "DAG appears to be missing DUCK support"
731#endif
732#endif
733
734/* Extracts DUCK information from the DAG card and produces a DUCK packet */
735static int dag_get_duckinfo(libtrace_t *libtrace,
736                                libtrace_packet_t *packet) {
737
738        if (DUCK.duck_freq == 0)
739                return 0;
740
741#ifndef LIBTRACE_DUCK_IOCTL
742        trace_set_err(libtrace, errno, 
743                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
744        DUCK.duck_freq = 0;
745        return -1;
746#endif
747
748        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
749                return 0;
750
751        /* Allocate memory for the DUCK data */
752        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
753                        !packet->buffer) {
754                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
755                packet->buf_control = TRACE_CTRL_PACKET;
756                if (!packet->buffer) {
757                        trace_set_err(libtrace, errno,
758                                        "Cannot allocate packet buffer");
759                        return -1;
760                }
761        }
762
763        /* DUCK doesn't have a format header */
764        packet->header = 0;
765        packet->payload = packet->buffer;
766
767        /* No need to check if we can get DUCK or not - we're modern
768         * enough so just grab the DUCK info */
769        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
770                                        (duckinf_t *)packet->payload) < 0)) {
771                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
772                DUCK.duck_freq = 0;
773                return -1;
774        }
775
776        packet->type = LIBTRACE_DUCK_VERSION;
777
778        /* Set the packet's trace to point at a DUCK trace, so that the
779         * DUCK format functions will be called on the packet rather than the
780         * DAG ones */
781        if (!DUCK.dummy_duck)
782                DUCK.dummy_duck = trace_create_dead("duck:dummy");
783        packet->trace = DUCK.dummy_duck;
784        DUCK.last_duck = DUCK.last_pkt;
785        return sizeof(duckinf_t);
786}
787
788/* Determines the amount of data available to read from the DAG card */
789static int dag_available(libtrace_t *libtrace) {
790        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
791
792        /* If we've processed more than 4MB of data since we last called
793         * dag_advance_stream, then we should call it again to allow the
794         * space occupied by that 4MB to be released */
795        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
796                return diff;
797       
798        /* Update the top and bottom pointers */
799        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
800                        FORMAT_DATA->dagstream,
801                        &(FORMAT_DATA->bottom));
802       
803        if (FORMAT_DATA->top == NULL) {
804                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
805                return -1;
806        }
807        FORMAT_DATA->processed = 0;
808        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
809        return diff;
810}
811
812/* Returns a pointer to the start of the next complete ERF record */
813static dag_record_t *dag_get_record(libtrace_t *libtrace) {
814        dag_record_t *erfptr = NULL;
815        uint16_t size;
816        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
817        if (!erfptr)
818                return NULL;
819        size = ntohs(erfptr->rlen);
820        assert( size >= dag_record_size );
821        /* Make certain we have the full packet available */
822        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
823                return NULL;
824        FORMAT_DATA->bottom += size;
825        FORMAT_DATA->processed += size;
826        return erfptr;
827}
828
829/* Converts a buffer containing a recently read DAG packet record into a
830 * libtrace packet */
831static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
832                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
833
834        dag_record_t *erfptr;
835       
836        /* If the packet previously owned a buffer that is not the buffer
837         * that contains the new packet data, we're going to need to free the
838         * old one to avoid memory leaks */
839        if (packet->buffer != buffer &&
840                        packet->buf_control == TRACE_CTRL_PACKET) {
841                free(packet->buffer);
842        }
843
844        /* Set the buffer owner appropriately */
845        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
846                packet->buf_control = TRACE_CTRL_PACKET;
847        } else
848                packet->buf_control = TRACE_CTRL_EXTERNAL;
849
850        /* Update the packet pointers and type appropriately */
851        erfptr = (dag_record_t *)buffer;
852        packet->buffer = erfptr;
853        packet->header = erfptr;
854        packet->type = rt_type;
855
856        if (erfptr->flags.rxerror == 1) {
857                /* rxerror means the payload is corrupt - drop the payload
858                 * by tweaking rlen */
859                packet->payload = NULL;
860                erfptr->rlen = htons(erf_get_framing_length(packet));
861        } else {
862                packet->payload = (char*)packet->buffer
863                        + erf_get_framing_length(packet);
864        }
865
866        if (libtrace->format_data == NULL) {
867                dag_init_format_data(libtrace);
868        }
869
870        /* Update the dropped packets counter */
871
872        /* No loss counter for DSM coloured records - have to use
873         * some other API */
874        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
875                /* TODO */
876        } else {
877                /* Use the ERF loss counter */
878                if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
879                        FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
880                } else {
881                        FORMAT_DATA->drops += ntohs(erfptr->lctr);
882                }
883        }
884
885        return 0;
886}
887
888/*
889 * dag_write_packet() at this stage attempts to improve tx performance
890 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
891 * has been met. I observed approximately 270% performance increase
892 * through this relatively naive tweak. No optimisation of buffer sizes
893 * was attempted.
894 */
895
896/* Pushes an ERF record onto the transmit stream */
897static int dag_dump_packet(libtrace_out_t *libtrace,
898                dag_record_t *erfptr, unsigned int pad, void *buffer) {
899        int size;
900
901        /*
902         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
903         * requested any space yet, and request some, storing the pointer at
904         * FORMAT_DATA_OUT->txbuffer.
905         *
906         * The amount to request is slightly magical at the moment - it's
907         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
908         * the buffer and handle overruns.
909         */
910        if (FORMAT_DATA_OUT->waiting == 0) {
911                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
912                                FORMAT_DATA_OUT->dagstream, 16908288);
913        }
914
915        /*
916         * Copy the header separately to the body, as we can't guarantee they
917         * are in contiguous memory
918         */
919        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
920        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
921
922
923
924        /*
925         * Copy our incoming packet into the outgoing buffer, and increment
926         * our waiting count
927         */
928        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
929        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
930        FORMAT_DATA_OUT->waiting += size;
931
932        /*
933         * If our output buffer has more than 16 Mebibytes in it, commit those
934         * bytes and reset the waiting count to 0.
935         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
936         * case there is still data in the buffer at program exit.
937         */
938
939        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
940                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
941                        FORMAT_DATA_OUT->waiting );
942                FORMAT_DATA_OUT->waiting = 0;
943        }
944
945        return size + pad + dag_record_size;
946
947}
948
949/* Attempts to determine a suitable ERF type for a given packet. Returns true
950 * if one is found, false otherwise */
951static bool find_compatible_linktype(libtrace_out_t *libtrace,
952                                libtrace_packet_t *packet, char *type)
953{
954         // Keep trying to simplify the packet until we can find
955         //something we can do with it
956
957        do {
958                *type=libtrace_to_erf_type(trace_get_link_type(packet));
959
960                // Success
961                if (*type != (char)-1)
962                        return true;
963
964                if (!demote_packet(packet)) {
965                        trace_set_err_out(libtrace,
966                                        TRACE_ERR_NO_CONVERSION,
967                                        "No erf type for packet (%i)",
968                                        trace_get_link_type(packet));
969                        return false;
970                }
971
972        } while(1);
973
974        return true;
975}
976
977/* Writes a packet to the provided DAG output trace */
978static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
979        /*
980         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
981         * sucks, sorry about that.
982         */
983        unsigned int pad = 0;
984        int numbytes;
985        void *payload = packet->payload;
986        dag_record_t *header = (dag_record_t *)packet->header;
987        char erf_type = 0;
988
989        if(!packet->header) {
990                /* No header, probably an RT packet. Lifted from
991                 * erf_write_packet(). */
992                return -1;
993        }
994
995        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
996                return 0;
997
998        pad = dag_get_padding(packet);
999
1000        /*
1001         * If the payload is null, adjust the rlen. Discussion of this is
1002         * attached to erf_write_packet()
1003         */
1004        if (payload == NULL) {
1005                header->rlen = htons(dag_record_size + pad);
1006        }
1007
1008        if (packet->type == TRACE_RT_DATA_ERF) {
1009                numbytes = dag_dump_packet(libtrace,
1010                                header,
1011                                pad,
1012                                payload
1013                                );
1014
1015        } else {
1016                /* Build up a new packet header from the existing header */
1017
1018                /* Simplify the packet first - if we can't do this, break
1019                 * early */
1020                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1021                        return -1;
1022
1023                dag_record_t erfhdr;
1024
1025                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1026                payload=packet->payload;
1027                pad = dag_get_padding(packet);
1028
1029                /* Flags. Can't do this */
1030                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1031                if (trace_get_direction(packet)!=(int)~0U)
1032                        erfhdr.flags.iface = trace_get_direction(packet);
1033
1034                erfhdr.type = erf_type;
1035
1036                /* Packet length (rlen includes format overhead) */
1037                assert(trace_get_capture_length(packet)>0
1038                                && trace_get_capture_length(packet)<=65536);
1039                assert(erf_get_framing_length(packet)>0
1040                                && trace_get_framing_length(packet)<=65536);
1041                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1042                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1043
1044                erfhdr.rlen = htons(trace_get_capture_length(packet)
1045                        + erf_get_framing_length(packet));
1046
1047
1048                /* Loss counter. Can't do this */
1049                erfhdr.lctr = 0;
1050                /* Wire length, does not include padding! */
1051                erfhdr.wlen = htons(trace_get_wire_length(packet));
1052
1053                /* Write it out */
1054                numbytes = dag_dump_packet(libtrace,
1055                                &erfhdr,
1056                                pad,
1057                                payload);
1058
1059        }
1060
1061        return numbytes;
1062}
1063
1064/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1065 *
1066 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1067 */
1068static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1069        int size = 0;
1070        struct timeval tv;
1071        dag_record_t *erfptr = NULL;
1072        int numbytes = 0;
1073        uint32_t flags = 0;
1074        struct timeval maxwait;
1075        struct timeval pollwait;
1076
1077        pollwait.tv_sec = 0;
1078        pollwait.tv_usec = 10000;
1079        maxwait.tv_sec = 0;
1080        maxwait.tv_usec = 250000;
1081
1082        /* Check if we're due for a DUCK report */
1083        size = dag_get_duckinfo(libtrace, packet);
1084
1085        if (size != 0)
1086                return size;
1087
1088
1089        /* Don't let anyone try to free our DAG memory hole! */
1090        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1091
1092        /* If the packet buffer is currently owned by libtrace, free it so
1093         * that we can set the packet to point into the DAG memory hole */
1094        if (packet->buf_control == TRACE_CTRL_PACKET) {
1095                free(packet->buffer);
1096                packet->buffer = 0;
1097        }
1098       
1099        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1100                        FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 
1101                        &pollwait) == -1)
1102        {
1103                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1104                return -1;
1105        }
1106
1107
1108        /* Grab a full ERF record */
1109        do {
1110                numbytes = dag_available(libtrace);
1111                if (numbytes < 0)
1112                        return numbytes;
1113                if (numbytes < dag_record_size) {
1114                        if (libtrace_halt)
1115                                return 0;
1116                        /* Block until we see a packet */
1117                        continue;
1118                }
1119                erfptr = dag_get_record(libtrace);
1120        } while (erfptr == NULL);
1121
1122        /* Prepare the libtrace packet */
1123        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1124                                flags))
1125                return -1;
1126
1127        /* Update the DUCK timer */
1128        tv = trace_get_timeval(packet);
1129        DUCK.last_pkt = tv.tv_sec;
1130
1131        return packet->payload ? htons(erfptr->rlen) :
1132                                erf_get_framing_length(packet);
1133}
1134
1135/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1136 * packet is available, we will return a packet event. Otherwise we will
1137 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1138 */
1139static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1140                                        libtrace_packet_t *packet) {
1141        libtrace_eventobj_t event = {0,0,0.0,0};
1142        dag_record_t *erfptr = NULL;
1143        int numbytes;
1144        uint32_t flags = 0;
1145        struct timeval minwait, tv;
1146       
1147        minwait.tv_sec = 0;
1148        minwait.tv_usec = 10000;
1149
1150        /* Check if we're meant to provide a DUCK update */
1151        numbytes = dag_get_duckinfo(libtrace, packet);
1152        if (numbytes < 0) {
1153                event.type = TRACE_EVENT_TERMINATE;
1154                return event;
1155        } else if (numbytes > 0) {
1156                event.type = TRACE_EVENT_PACKET;
1157                return event;
1158        }
1159       
1160        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1161                        FORMAT_DATA->dagstream, 0, &minwait, 
1162                        &minwait) == -1)
1163        {
1164                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1165                event.type = TRACE_EVENT_TERMINATE;
1166                return event;
1167        }
1168
1169        do {
1170                erfptr = NULL;
1171                numbytes = 0;
1172       
1173                /* Need to call dag_available so that the top pointer will get
1174                 * updated, otherwise we'll never see any data! */
1175                numbytes = dag_available(libtrace);
1176
1177                /* May as well not bother calling dag_get_record if
1178                 * dag_available suggests that there's no data */
1179                if (numbytes != 0)
1180                        erfptr = dag_get_record(libtrace);
1181                if (erfptr == NULL) {
1182                        /* No packet available - sleep for a very short time */
1183                        if (libtrace_halt) {
1184                                event.type = TRACE_EVENT_TERMINATE;
1185                        } else {                       
1186                                event.type = TRACE_EVENT_SLEEP;
1187                                event.seconds = 0.0001;
1188                        }
1189                        break;
1190                }
1191                if (dag_prepare_packet(libtrace, packet, erfptr, 
1192                                        TRACE_RT_DATA_ERF, flags)) {
1193                        event.type = TRACE_EVENT_TERMINATE;
1194                        break;
1195                }
1196
1197
1198                event.size = trace_get_capture_length(packet) + 
1199                                trace_get_framing_length(packet);
1200               
1201                /* XXX trace_read_packet() normally applies the following
1202                 * config options for us, but this function is called via
1203                 * trace_event() so we have to do it ourselves */
1204
1205                if (libtrace->filter) {
1206                        int filtret = trace_apply_filter(libtrace->filter, 
1207                                        packet);
1208                        if (filtret == -1) {
1209                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1210                                                "Bad BPF Filter");
1211                                event.type = TRACE_EVENT_TERMINATE;
1212                                break;
1213                        }
1214
1215                        if (filtret == 0) {
1216                                /* This packet isn't useful so we want to
1217                                 * immediately see if there is another suitable
1218                                 * one - we definitely DO NOT want to return
1219                                 * a sleep event in this case, like we used to
1220                                 * do! */
1221                                libtrace->filtered_packets ++;
1222                                trace_clear_cache(packet);
1223                                continue;
1224                        }
1225                               
1226                        event.type = TRACE_EVENT_PACKET;
1227                } else {
1228                        event.type = TRACE_EVENT_PACKET;
1229                }
1230
1231                /* Update the DUCK timer */
1232                tv = trace_get_timeval(packet);
1233                DUCK.last_pkt = tv.tv_sec;
1234               
1235                if (libtrace->snaplen > 0) {
1236                        trace_set_capture_length(packet, libtrace->snaplen);
1237                }
1238                libtrace->accepted_packets ++;
1239                break;
1240        } while (1);
1241
1242        return event;
1243}
1244
1245/* Gets the number of dropped packets */
1246static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1247        if (trace->format_data == NULL)
1248                return (uint64_t)-1;
1249        return DATA(trace)->drops;
1250}
1251
1252/* Prints some semi-useful help text about the DAG format module */
1253static void dag_help(void) {
1254        printf("dag format module: $Revision: 1755 $\n");
1255        printf("Supported input URIs:\n");
1256        printf("\tdag:/dev/dagn\n");
1257        printf("\n");
1258        printf("\te.g.: dag:/dev/dag0\n");
1259        printf("\n");
1260        printf("Supported output URIs:\n");
1261        printf("\tnone\n");
1262        printf("\n");
1263}
1264
1265static struct libtrace_format_t dag = {
1266        "dag",
1267        "$Id$",
1268        TRACE_FORMAT_ERF,
1269        dag_probe_filename,             /* probe filename */
1270        NULL,                           /* probe magic */
1271        dag_init_input,                 /* init_input */
1272        dag_config_input,               /* config_input */
1273        dag_start_input,                /* start_input */
1274        dag_pause_input,                /* pause_input */
1275        dag_init_output,                /* init_output */
1276        NULL,                           /* config_output */
1277        dag_start_output,               /* start_output */
1278        dag_fin_input,                  /* fin_input */
1279        dag_fin_output,                 /* fin_output */
1280        dag_read_packet,                /* read_packet */
1281        dag_prepare_packet,             /* prepare_packet */
1282        NULL,                           /* fin_packet */
1283        dag_write_packet,               /* write_packet */
1284        erf_get_link_type,              /* get_link_type */
1285        erf_get_direction,              /* get_direction */
1286        erf_set_direction,              /* set_direction */
1287        erf_get_erf_timestamp,          /* get_erf_timestamp */
1288        NULL,                           /* get_timeval */
1289        NULL,                           /* get_seconds */
1290        NULL,                           /* get_timespec */
1291        NULL,                           /* seek_erf */
1292        NULL,                           /* seek_timeval */
1293        NULL,                           /* seek_seconds */
1294        erf_get_capture_length,         /* get_capture_length */
1295        erf_get_wire_length,            /* get_wire_length */
1296        erf_get_framing_length,         /* get_framing_length */
1297        erf_set_capture_length,         /* set_capture_length */
1298        NULL,                           /* get_received_packets */
1299        NULL,                           /* get_filtered_packets */
1300        dag_get_dropped_packets,        /* get_dropped_packets */
1301        NULL,                           /* get_captured_packets */
1302        NULL,                           /* get_fd */
1303        trace_event_dag,                /* trace_event */
1304        dag_help,                       /* help */
1305        NULL                            /* next pointer */
1306};
1307
1308void dag_constructor(void) {
1309        register_format(&dag);
1310}
Note: See TracBrowser for help on using the repository browser.