source: lib/format_dag25.c @ 92020b8

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 92020b8 was 92020b8, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Move DAG device back to the shared trace format data, rather than per stream
Fix minor bug introduced by parallel framework changes, it is now up to
the format to mark a packet against their own trace if its needed during
the read packet function.

  • Property mode set to 100644
File size: 44.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81#define DATA(x) ((struct dag_format_data_t *)x->format_data)
82#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89
90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
92
93static struct libtrace_format_t dag;
94
95/* A DAG device - a DAG device can support multiple streams (and therefore
96 * multiple input traces) so each trace needs to refer to a device */
97struct dag_dev_t {
98        char * dev_name;                /* Device name */
99        int fd;                         /* File descriptor */
100        uint16_t ref_count;             /* Number of input / output traces
101                                           that are using this device */
102        struct dag_dev_t *prev;         /* Pointer to the previous device in
103                                           the device list */
104        struct dag_dev_t *next;         /* Pointer to the next device in the
105                                           device list */
106};
107
108/* "Global" data that is stored for each DAG output trace */
109struct dag_format_data_out_t {
110        /* String containing the DAG device name */
111        char *device_name;
112        /* The DAG device being used for writing */
113        struct dag_dev_t *device;
114        /* The DAG stream that is being written on */
115        unsigned int dagstream;
116        /* Boolean flag indicating whether the stream is currently attached */
117        int stream_attached;
118        /* The amount of data waiting to be transmitted, in bytes */
119        uint64_t waiting;
120        /* A buffer to hold the data to be transmittted */
121        uint8_t *txbuffer;
122};
123
124/* Data that is stored against each input stream */
125struct dag_per_stream_t {
126        /* DAG stream number */
127        uint16_t dagstream;
128        /* Pointer to the last unread byte in the DAG memory */
129        uint8_t *top;
130        /* Pointer to the first unread byte in the DAG memory */
131        uint8_t *bottom;
132        /* Amount of data processed from the bottom pointer */
133        uint32_t processed;
134        /* Number of packets seen by the stream */
135        uint64_t pkt_count;
136        /* Drop count for this particular stream */
137        uint64_t drops;
138        /* Boolean values to indicate if a particular interface has been seen
139         * or not. This is limited to four interfaces, which is enough to
140         * support all current DAG cards */
141        uint8_t seeninterface[4];
142};
143
144/* "Global" data that is stored for each DAG input trace */
145struct dag_format_data_t {
146        /* Data required for regular DUCK reporting */
147        /* TODO: This doesn't work with the 10X2S card! I don't know how
148         * DUCK stuff works and don't know how to fix it */
149        struct {
150                /* Timestamp of the last DUCK report */
151                uint32_t last_duck;
152                /* The number of seconds between each DUCK report */
153                uint32_t duck_freq;
154                /* Timestamp of the last packet read from the DAG card */
155                uint32_t last_pkt;
156                /* Dummy trace to ensure DUCK packets are dealt with using the
157                 * DUCK format functions */
158                libtrace_t *dummy_duck;
159        } duck;
160        /* DAG device */
161        struct dag_dev_t *device;
162        /* String containing the DAG device name */
163        char *device_name;
164        /* Boolean flag indicating whether the trace is currently attached */
165        int stream_attached;
166        /* Data stored against each DAG input stream */
167        libtrace_list_t *per_stream;
168};
169
170/* To be thread-safe, we're going to need a mutex for operating on the list
171 * of DAG devices */
172pthread_mutex_t open_dag_mutex;
173
174/* The list of DAG devices that have been opened by libtrace.
175 *
176 * We can only open each DAG device once, but we might want to read from
177 * multiple streams. Therefore, we need to maintain a list of devices that we
178 * have opened (with ref counts!) so that we don't try to open a device too
179 * many times or close a device that we're still using */
180struct dag_dev_t *open_dags = NULL;
181
182/* Returns the amount of padding between the ERF header and the start of the
183 * captured packet data */
184static int dag_get_padding(const libtrace_packet_t *packet)
185{
186        /* ERF Ethernet records have a 2 byte padding before the packet itself
187         * so that the IP header is aligned on a 32 bit boundary.
188         */
189        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
190                dag_record_t *erfptr = (dag_record_t *)packet->header;
191                switch(erfptr->type) {
192                        case TYPE_ETH:
193                        case TYPE_DSM_COLOR_ETH:
194                                return 2;
195                        default:                return 0;
196                }
197        }
198        else {
199                switch(trace_get_link_type(packet)) {
200                        case TRACE_TYPE_ETH:    return 2;
201                        default:                return 0;
202                }
203        }
204}
205
206/* Attempts to determine if the given filename refers to a DAG device */
207static int dag_probe_filename(const char *filename)
208{
209        struct stat statbuf;
210        /* Can we stat the file? */
211        if (stat(filename, &statbuf) != 0) {
212                return 0;
213        }
214        /* Is it a character device? */
215        if (!S_ISCHR(statbuf.st_mode)) {
216                return 0;
217        }
218        /* Yeah, it's probably us. */
219        return 1;
220}
221
222/* Initialises the DAG output data structure */
223static void dag_init_format_out_data(libtrace_out_t *libtrace)
224{
225        libtrace->format_data = (struct dag_format_data_out_t *)
226                malloc(sizeof(struct dag_format_data_out_t));
227        // no DUCK on output
228        FORMAT_DATA_OUT->stream_attached = 0;
229        FORMAT_DATA_OUT->device = NULL;
230        FORMAT_DATA_OUT->device_name = NULL;
231        FORMAT_DATA_OUT->dagstream = 0;
232        FORMAT_DATA_OUT->waiting = 0;
233
234}
235
236/* Initialises the DAG input data structure */
237static void dag_init_format_data(libtrace_t *libtrace)
238{
239        struct dag_per_stream_t stream_data;
240
241        libtrace->format_data = (struct dag_format_data_t *)
242                malloc(sizeof(struct dag_format_data_t));
243        DUCK.last_duck = 0;
244        DUCK.duck_freq = 0;
245        DUCK.last_pkt = 0;
246        DUCK.dummy_duck = NULL;
247
248        FORMAT_DATA->per_stream =
249                libtrace_list_init(sizeof(stream_data));
250        assert(FORMAT_DATA->per_stream != NULL);
251
252        /* We'll start with just one instance of stream_data, and we'll
253         * add more later if we need them */
254        memset(&stream_data, 0, sizeof(stream_data));
255        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
256        FORMAT_DATA->device_name = NULL;
257}
258
259/* Determines if there is already an entry for the given DAG device in the
260 * device list and increments the reference count for that device, if found.
261 *
262 * NOTE: This function assumes the open_dag_mutex is held by the caller */
263static struct dag_dev_t *dag_find_open_device(char *dev_name)
264{
265        struct dag_dev_t *dag_dev;
266
267        dag_dev = open_dags;
268
269        /* XXX: Not exactly zippy, but how often are we going to be dealing
270         * with multiple dag cards? */
271        while (dag_dev != NULL) {
272                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
273                        dag_dev->ref_count ++;
274                        return dag_dev;
275                }
276                dag_dev = dag_dev->next;
277        }
278        return NULL;
279}
280
281/* Closes a DAG device and removes it from the device list.
282 *
283 * Attempting to close a DAG device that has a non-zero reference count will
284 * cause an assertion failure!
285 *
286 * NOTE: This function assumes the open_dag_mutex is held by the caller */
287static void dag_close_device(struct dag_dev_t *dev)
288{
289        /* Need to remove from the device list */
290        assert(dev->ref_count == 0);
291
292        if (dev->prev == NULL) {
293                open_dags = dev->next;
294                if (dev->next)
295                        dev->next->prev = NULL;
296        } else {
297                dev->prev->next = dev->next;
298                if (dev->next)
299                        dev->next->prev = dev->prev;
300        }
301
302        dag_close(dev->fd);
303        if (dev->dev_name)
304                free(dev->dev_name);
305        free(dev);
306}
307
308
309/* Opens a new DAG device for writing and adds it to the DAG device list
310 *
311 * NOTE: this function should only be called when opening a DAG device for
312 * writing - there is little practical difference between this and the
313 * function below that covers the reading case, but we need the output trace
314 * object to report errors properly so the two functions take slightly
315 * different arguments. This is really lame and there should be a much better
316 * way of doing this.
317 *
318 * NOTE: This function assumes the open_dag_mutex is held by the caller
319 */
320static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
321                                                char *dev_name)
322{
323        struct stat buf;
324        int fd;
325        struct dag_dev_t *new_dev;
326
327        /* Make sure the device exists */
328        if (stat(dev_name, &buf) == -1) {
329                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
330                return NULL;
331        }
332
333        /* Make sure it is the appropriate type of device */
334        if (S_ISCHR(buf.st_mode)) {
335                /* Try opening the DAG device */
336                if((fd = dag_open(dev_name)) < 0) {
337                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
338                                        dev_name);
339                        return NULL;
340                }
341        } else {
342                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
343                                dev_name);
344                return NULL;
345        }
346
347        /* Add the device to our device list - it is just a doubly linked
348         * list with no inherent ordering; just tack the new one on the front
349         */
350        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
351        new_dev->fd = fd;
352        new_dev->dev_name = dev_name;
353        new_dev->ref_count = 1;
354
355        new_dev->prev = NULL;
356        new_dev->next = open_dags;
357        if (open_dags)
358                open_dags->prev = new_dev;
359
360        open_dags = new_dev;
361
362        return new_dev;
363}
364
365/* Opens a new DAG device for reading and adds it to the DAG device list
366 *
367 * NOTE: this function should only be called when opening a DAG device for
368 * reading - there is little practical difference between this and the
369 * function above that covers the writing case, but we need the input trace
370 * object to report errors properly so the two functions take slightly
371 * different arguments. This is really lame and there should be a much better
372 * way of doing this.
373 *
374 * NOTE: This function assumes the open_dag_mutex is held by the caller */
375static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
376        struct stat buf;
377        int fd;
378        struct dag_dev_t *new_dev;
379
380        /* Make sure the device exists */
381        if (stat(dev_name, &buf) == -1) {
382                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
383                return NULL;
384        }
385
386        /* Make sure it is the appropriate type of device */
387        if (S_ISCHR(buf.st_mode)) {
388                /* Try opening the DAG device */
389                if((fd = dag_open(dev_name)) < 0) {
390                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
391                                      dev_name);
392                        return NULL;
393                }
394        } else {
395                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
396                              dev_name);
397                return NULL;
398        }
399
400        /* Add the device to our device list - it is just a doubly linked
401         * list with no inherent ordering; just tack the new one on the front
402         */
403        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
404        new_dev->fd = fd;
405        new_dev->dev_name = dev_name;
406        new_dev->ref_count = 1;
407
408        new_dev->prev = NULL;
409        new_dev->next = open_dags;
410        if (open_dags)
411                open_dags->prev = new_dev;
412
413        open_dags = new_dev;
414
415        return new_dev;
416}
417
418/* Creates and initialises a DAG output trace */
419static int dag_init_output(libtrace_out_t *libtrace)
420{
421        char *scan = NULL;
422        struct dag_dev_t *dag_device = NULL;
423        int stream = 1;
424
425        /* XXX I don't know if this is important or not, but this function
426         * isn't present in all of the driver releases that this code is
427         * supposed to support! */
428        /*
429        unsigned long wake_time;
430        dagutil_sleep_get_wake_time(&wake_time,0);
431        */
432
433        dag_init_format_out_data(libtrace);
434        /* Grab the mutex while we're likely to be messing with the device
435         * list */
436        pthread_mutex_lock(&open_dag_mutex);
437
438        /* Specific streams are signified using a comma in the libtrace URI,
439         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
440         *
441         * If no stream is specified, we will write using stream 1 */
442        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
443                FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);
444        } else {
445                FORMAT_DATA_OUT->device_name =
446                                (char *)strndup(libtrace->uridata,
447                                (size_t)(scan - libtrace->uridata));
448                stream = atoi(++scan);
449        }
450        FORMAT_DATA_OUT->dagstream = stream;
451
452        /* See if our DAG device is already open */
453        dag_device = dag_find_open_device(FORMAT_DATA_OUT->device_name);
454
455        if (dag_device == NULL) {
456                /* Device not yet opened - open it ourselves */
457                dag_device = dag_open_output_device(libtrace,
458                                FORMAT_DATA_OUT->device_name);
459        }
460
461        /* Make sure we have successfully opened a DAG device */
462        if (dag_device == NULL) {
463                if (FORMAT_DATA_OUT->device_name) {
464                        free(FORMAT_DATA_OUT->device_name);
465                        FORMAT_DATA_OUT->device_name = NULL;
466                }
467                pthread_mutex_unlock(&open_dag_mutex);
468                return -1;
469        }
470
471        FORMAT_DATA_OUT->device = dag_device;
472        pthread_mutex_unlock(&open_dag_mutex);
473        return 0;
474}
475
476/* Creates and initialises a DAG input trace */
477static int dag_init_input(libtrace_t *libtrace) {
478        char *scan = NULL;
479        int stream = 0;
480        struct dag_dev_t *dag_device = NULL;
481
482        dag_init_format_data(libtrace);
483        /* Grab the mutex while we're likely to be messing with the device
484         * list */
485        pthread_mutex_lock(&open_dag_mutex);
486
487
488        /* DAG cards support multiple streams. In a single threaded capture,
489         * these are specified using a comma in the libtrace URI,
490         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
491         *
492         * If no stream is specified, we will read from stream 0 with
493         * one thread
494         */
495        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
496                FORMAT_DATA->device_name = strdup(libtrace->uridata);
497        } else {
498                FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,
499                                (size_t)(scan - libtrace->uridata));
500                stream = atoi(++scan);
501        }
502
503        FORMAT_DATA_FIRST->dagstream = stream;
504
505        /* See if our DAG device is already open */
506        dag_device = dag_find_open_device(FORMAT_DATA->device_name);
507
508        if (dag_device == NULL) {
509                /* Device not yet opened - open it ourselves */
510                dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name);
511        }
512
513        /* Make sure we have successfully opened a DAG device */
514        if (dag_device == NULL) {
515                if (FORMAT_DATA->device_name)
516                        free(FORMAT_DATA->device_name);
517                FORMAT_DATA->device_name = NULL;
518                pthread_mutex_unlock(&open_dag_mutex);
519                return -1;
520        }
521
522        FORMAT_DATA->device = dag_device;
523
524        /* See Config_Status_API_Programming_Guide.pdf from the Endace
525           Dag Documentation */
526        /* Check kBooleanAttributeActive is true -- no point capturing
527         * on an interface that's disabled
528         *
529         * The symptom of the port being disabled is that libtrace
530         * will appear to hang. */
531        /* Check kBooleanAttributeFault is false */
532        /* Check kBooleanAttributeLocalFault is false */
533        /* Check kBooleanAttributeLock is true ? */
534        /* Check kBooleanAttributePeerLink ? */
535
536        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
537           on libtrace promisc attribute?*/
538        /* Set kUint32AttributeSnapLength to the snaplength */
539
540        pthread_mutex_unlock(&open_dag_mutex);
541        return 0;
542}
543
544/* Configures a DAG input trace */
545static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
546                            void *data)
547{
548        char conf_str[4096];
549        switch(option) {
550        case TRACE_OPTION_META_FREQ:
551                /* This option is used to specify the frequency of DUCK
552                 * updates */
553                DUCK.duck_freq = *(int *)data;
554                return 0;
555        case TRACE_OPTION_SNAPLEN:
556                /* Tell the card our new snap length */
557                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
558                if (dag_configure(FORMAT_DATA->device->fd,
559                                  conf_str) != 0) {
560                        trace_set_err(libtrace, errno, "Failed to configure "
561                                      "snaplen on DAG card: %s",
562                                      libtrace->uridata);
563                        return -1;
564                }
565                return 0;
566        case TRACE_OPTION_PROMISC:
567                /* DAG already operates in a promisc fashion */
568                return -1;
569        case TRACE_OPTION_FILTER:
570                /* We don't yet support pushing filters into DAG
571                 * cards */
572                return -1;
573        case TRACE_OPTION_EVENT_REALTIME:
574                /* Live capture is always going to be realtime */
575                return -1;
576        }
577        return -1;
578}
579
580/* Starts a DAG output trace */
581static int dag_start_output(libtrace_out_t *libtrace)
582{
583        struct timeval zero, nopoll;
584
585        zero.tv_sec = 0;
586        zero.tv_usec = 0;
587        nopoll = zero;
588
589        /* Attach and start the DAG stream */
590        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
591                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
592                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
593                return -1;
594        }
595
596        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
597                        FORMAT_DATA_OUT->dagstream) < 0) {
598                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
599                return -1;
600        }
601        FORMAT_DATA_OUT->stream_attached = 1;
602
603        /* We don't want the dag card to do any sleeping */
604        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
605                        FORMAT_DATA_OUT->dagstream, 0, &zero,
606                        &nopoll);
607
608        return 0;
609}
610
611static int dag_start_input_stream(libtrace_t *libtrace,
612                                  struct dag_per_stream_t * stream) {
613        struct timeval zero, nopoll;
614        uint8_t *top, *bottom, *starttop;
615        top = bottom = NULL;
616
617        zero.tv_sec = 0;
618        zero.tv_usec = 10000;
619        nopoll = zero;
620
621        /* Attach and start the DAG stream */
622        if (dag_attach_stream(FORMAT_DATA->device->fd,
623                              stream->dagstream, 0, 0) < 0) {
624                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
625                              stream->dagstream);
626                return -1;
627        }
628
629        if (dag_start_stream(FORMAT_DATA->device->fd,
630                             stream->dagstream) < 0) {
631                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
632                              stream->dagstream);
633                return -1;
634        }
635        FORMAT_DATA->stream_attached = 1;
636
637        /* We don't want the dag card to do any sleeping */
638        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
639                            stream->dagstream, 0, &zero,
640                            &nopoll) < 0) {
641                trace_set_err(libtrace, errno,
642                              "dag_set_stream_poll failed!");
643                return -1;
644        }
645
646        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
647                                      stream->dagstream,
648                                      &bottom);
649
650        /* Should probably flush the memory hole now */
651        top = starttop;
652        while (starttop - bottom > 0) {
653                bottom += (starttop - bottom);
654                top = dag_advance_stream(FORMAT_DATA->device->fd,
655                                         stream->dagstream,
656                                         &bottom);
657        }
658        stream->top = top;
659        stream->bottom = bottom;
660        stream->processed = 0;
661        stream->drops = 0;
662
663        return 0;
664
665}
666
667/* Starts a DAG input trace */
668static int dag_start_input(libtrace_t *libtrace)
669{
670        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
671}
672
673static int dag_pstart_input(libtrace_t *libtrace)
674{
675        char *scan, *tok;
676        uint16_t stream_count = 0, max_streams;
677        int iserror = 0;
678        struct dag_per_stream_t stream_data;
679
680        /* Check we aren't trying to create more threads than the DAG card can
681         * handle */
682        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
683        if (libtrace->perpkt_thread_count > max_streams) {
684                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
685                              "trying to create too many threads (max is %u)",
686                              max_streams);
687                iserror = 1;
688                goto cleanup;
689        }
690
691        /* Get the stream names from the uri */
692        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
693                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
694                              "format uri doesn't specify the DAG streams");
695                iserror = 1;
696                goto cleanup;
697        }
698
699        scan++;
700
701        tok = strtok(scan, ",");
702        while (tok != NULL) {
703                /* Ensure we haven't specified too many streams */
704                if (stream_count >= libtrace->perpkt_thread_count) {
705                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
706                                      "format uri specifies too many streams. "
707                                      "Max is %u", max_streams);
708                        iserror = 1;
709                        goto cleanup;
710                }
711
712                /* Save the stream details */
713                if (stream_count == 0) {
714                        /* Special case where we update the existing stream
715                         * data structure */
716                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
717                } else {
718                        memset(&stream_data, 0, sizeof(stream_data));
719                        stream_data.dagstream = (uint16_t)atoi(tok);
720                        libtrace_list_push_back(FORMAT_DATA->per_stream,
721                                                &stream_data);
722                }
723
724                stream_count++;
725                tok = strtok(NULL, ",");
726        }
727
728        FORMAT_DATA->stream_attached = 1;
729
730 cleanup:
731        if (iserror) {
732                return -1;
733        } else {
734                return 0;
735        }
736}
737
738/* Pauses a DAG output trace */
739static int dag_pause_output(libtrace_out_t *libtrace)
740{
741        /* Stop and detach the stream */
742        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
743                            FORMAT_DATA_OUT->dagstream) < 0) {
744                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
745                return -1;
746        }
747        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
748                              FORMAT_DATA_OUT->dagstream) < 0) {
749                trace_set_err_out(libtrace, errno,
750                                  "Could not detach DAG stream");
751                return -1;
752        }
753        FORMAT_DATA_OUT->stream_attached = 0;
754        return 0;
755}
756
757/* Pauses a DAG input trace */
758static int dag_pause_input(libtrace_t *libtrace)
759{
760        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
761
762        /* Stop and detach each stream */
763        while (tmp != NULL) {
764                if (dag_stop_stream(FORMAT_DATA->device->fd,
765                                    STREAM_DATA(tmp)->dagstream) < 0) {
766                        trace_set_err(libtrace, errno,
767                                      "Could not stop DAG stream");
768                        printf("Count not stop DAG stream\n");
769                        return -1;
770                }
771                if (dag_detach_stream(FORMAT_DATA->device->fd,
772                                      STREAM_DATA(tmp)->dagstream) < 0) {
773                        trace_set_err(libtrace, errno,
774                                      "Could not detach DAG stream");
775                        printf("Count not detach DAG stream\n");
776                        return -1;
777                }
778
779                tmp = tmp->next;
780        }
781
782        FORMAT_DATA->stream_attached = 0;
783        return 0;
784}
785
786
787
788/* Closes a DAG input trace */
789static int dag_fin_input(libtrace_t *libtrace)
790{
791        /* Need the lock, since we're going to be handling the device list */
792        pthread_mutex_lock(&open_dag_mutex);
793
794        /* Detach the stream if we are not paused */
795        if (FORMAT_DATA->stream_attached)
796                dag_pause_input(libtrace);
797        FORMAT_DATA->device->ref_count--;
798
799        /* Close the DAG device if there are no more references to it */
800        if (FORMAT_DATA->device->ref_count == 0)
801                dag_close_device(FORMAT_DATA->device);
802
803        if (DUCK.dummy_duck)
804                trace_destroy_dead(DUCK.dummy_duck);
805
806        /* Clear the list */
807        libtrace_list_deinit(FORMAT_DATA->per_stream);
808
809        if (FORMAT_DATA->device_name)
810                free(FORMAT_DATA->device_name);
811        free(libtrace->format_data);
812        pthread_mutex_unlock(&open_dag_mutex);
813        return 0; /* success */
814}
815
816/* Closes a DAG output trace */
817static int dag_fin_output(libtrace_out_t *libtrace)
818{
819
820        /* Commit any outstanding traffic in the txbuffer */
821        if (FORMAT_DATA_OUT->waiting) {
822                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
823                                           FORMAT_DATA_OUT->dagstream,
824                                           FORMAT_DATA_OUT->waiting );
825        }
826
827        /* Wait until the buffer is nearly clear before exiting the program,
828         * as we will lose packets otherwise */
829        dag_tx_get_stream_space
830                (FORMAT_DATA_OUT->device->fd,
831                 FORMAT_DATA_OUT->dagstream,
832                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
833                                            FORMAT_DATA_OUT->dagstream) - 8);
834
835        /* Need the lock, since we're going to be handling the device list */
836        pthread_mutex_lock(&open_dag_mutex);
837
838        /* Detach the stream if we are not paused */
839        if (FORMAT_DATA_OUT->stream_attached)
840                dag_pause_output(libtrace);
841        FORMAT_DATA_OUT->device->ref_count --;
842
843        /* Close the DAG device if there are no more references to it */
844        if (FORMAT_DATA_OUT->device->ref_count == 0)
845                dag_close_device(FORMAT_DATA_OUT->device);
846        if (FORMAT_DATA_OUT->device_name)
847                free(FORMAT_DATA_OUT->device_name);
848        free(libtrace->format_data);
849        pthread_mutex_unlock(&open_dag_mutex);
850        return 0; /* success */
851}
852
853#ifdef DAGIOC_CARD_DUCK
854#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
855#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
856#else
857#ifdef DAGIOCDUCK
858#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
859#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
860#else
861#warning "DAG appears to be missing DUCK support"
862#endif
863#endif
864
865/* Extracts DUCK information from the DAG card and produces a DUCK packet */
866static int dag_get_duckinfo(libtrace_t *libtrace,
867                                libtrace_packet_t *packet) {
868
869        if (DUCK.duck_freq == 0)
870                return 0;
871
872#ifndef LIBTRACE_DUCK_IOCTL
873        trace_set_err(libtrace, errno, 
874                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
875        DUCK.duck_freq = 0;
876        return -1;
877#endif
878
879        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
880                return 0;
881
882        /* Allocate memory for the DUCK data */
883        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
884            !packet->buffer) {
885                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
886                packet->buf_control = TRACE_CTRL_PACKET;
887                if (!packet->buffer) {
888                        trace_set_err(libtrace, errno,
889                                      "Cannot allocate packet buffer");
890                        return -1;
891                }
892        }
893
894        /* DUCK doesn't have a format header */
895        packet->header = 0;
896        packet->payload = packet->buffer;
897
898        /* No need to check if we can get DUCK or not - we're modern
899         * enough so just grab the DUCK info */
900        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
901                   (duckinf_t *)packet->payload) < 0)) {
902                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
903                DUCK.duck_freq = 0;
904                return -1;
905        }
906
907        packet->type = LIBTRACE_DUCK_VERSION;
908
909        /* Set the packet's trace to point at a DUCK trace, so that the
910         * DUCK format functions will be called on the packet rather than the
911         * DAG ones */
912        if (!DUCK.dummy_duck)
913                DUCK.dummy_duck = trace_create_dead("duck:dummy");
914        packet->trace = DUCK.dummy_duck;
915        DUCK.last_duck = DUCK.last_pkt;
916        return sizeof(duckinf_t);
917}
918
919/* Determines the amount of data available to read from the DAG card */
920static int dag_available(libtrace_t *libtrace,
921                         struct dag_per_stream_t *stream_data)
922{
923        uint32_t diff = stream_data->top - stream_data->bottom;
924
925        /* If we've processed more than 4MB of data since we last called
926         * dag_advance_stream, then we should call it again to allow the
927         * space occupied by that 4MB to be released */
928        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
929                return diff;
930
931        /* Update the top and bottom pointers */
932        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
933                                              stream_data->dagstream,
934                                              &(stream_data->bottom));
935
936        if (stream_data->top == NULL) {
937                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
938                return -1;
939        }
940        stream_data->processed = 0;
941        diff = stream_data->top - stream_data->bottom;
942        return diff;
943}
944
945/* Returns a pointer to the start of the next complete ERF record */
946static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
947{
948        dag_record_t *erfptr = NULL;
949        uint16_t size;
950
951        erfptr = (dag_record_t *)stream_data->bottom;
952        if (!erfptr)
953                return NULL;
954
955        size = ntohs(erfptr->rlen);
956        assert( size >= dag_record_size );
957
958        /* Make certain we have the full packet available */
959        if (size > (stream_data->top - stream_data->bottom))
960                return NULL;
961
962        stream_data->bottom += size;
963        stream_data->processed += size;
964        return erfptr;
965}
966
967/* Converts a buffer containing a recently read DAG packet record into a
968 * libtrace packet */
969static int dag_prepare_packet_real(libtrace_t *libtrace,
970                                   struct dag_per_stream_t *stream_data,
971                                   libtrace_packet_t *packet,
972                                   void *buffer, libtrace_rt_types_t rt_type,
973                                   uint32_t flags)
974{
975        dag_record_t *erfptr;
976
977        /* If the packet previously owned a buffer that is not the buffer
978         * that contains the new packet data, we're going to need to free the
979         * old one to avoid memory leaks */
980        if (packet->buffer != buffer &&
981            packet->buf_control == TRACE_CTRL_PACKET) {
982                free(packet->buffer);
983        }
984
985        /* Set the buffer owner appropriately */
986        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
987                packet->buf_control = TRACE_CTRL_PACKET;
988        } else
989                packet->buf_control = TRACE_CTRL_EXTERNAL;
990
991        /* Update the packet pointers and type appropriately */
992        erfptr = (dag_record_t *)buffer;
993        packet->buffer = erfptr;
994        packet->header = erfptr;
995        packet->type = rt_type;
996
997        if (erfptr->flags.rxerror == 1) {
998                /* rxerror means the payload is corrupt - drop the payload
999                 * by tweaking rlen */
1000                packet->payload = NULL;
1001                erfptr->rlen = htons(erf_get_framing_length(packet));
1002        } else {
1003                packet->payload = (char*)packet->buffer
1004                        + erf_get_framing_length(packet);
1005        }
1006
1007        if (libtrace->format_data == NULL) {
1008                dag_init_format_data(libtrace);
1009        }
1010
1011        /* Update the dropped packets counter */
1012        /* No loss counter for DSM coloured records - have to use some
1013         * other API */
1014        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
1015                /* TODO */
1016        } else {
1017                /* Use the ERF loss counter */
1018                if (stream_data->seeninterface[erfptr->flags.iface]
1019                    == 0) {
1020                        stream_data->seeninterface[erfptr->flags.iface]
1021                                = 1;
1022                } else {
1023                        stream_data->drops += ntohs(erfptr->lctr);
1024                }
1025        }
1026
1027        packet->error = 1;
1028
1029        return 0;
1030}
1031
1032static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1033                              void *buffer, libtrace_rt_types_t rt_type,
1034                              uint32_t flags)
1035{
1036        return dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
1037                                       buffer, rt_type, flags);
1038}
1039
1040/*
1041 * dag_write_packet() at this stage attempts to improve tx performance
1042 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1043 * has been met. I observed approximately 270% performance increase
1044 * through this relatively naive tweak. No optimisation of buffer sizes
1045 * was attempted.
1046 */
1047
1048/* Pushes an ERF record onto the transmit stream */
1049static int dag_dump_packet(libtrace_out_t *libtrace,
1050                           dag_record_t *erfptr, unsigned int pad,
1051                           void *buffer)
1052{
1053        int size;
1054
1055        /*
1056         * If we've got 0 bytes waiting in the txqueue, assume that we
1057         * haven't requested any space yet, and request some, storing
1058         * the pointer at FORMAT_DATA_OUT->txbuffer.
1059         *
1060         * The amount to request is slightly magical at the moment - it's
1061         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1062         * the buffer and handle overruns.
1063         */
1064        if (FORMAT_DATA_OUT->waiting == 0) {
1065                FORMAT_DATA_OUT->txbuffer =
1066                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1067                                                FORMAT_DATA_OUT->dagstream,
1068                                                16908288);
1069        }
1070
1071        /*
1072         * Copy the header separately to the body, as we can't guarantee they
1073         * are in contiguous memory
1074         */
1075        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1076               (dag_record_size + pad));
1077        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1078
1079        /*
1080         * Copy our incoming packet into the outgoing buffer, and increment
1081         * our waiting count
1082         */
1083        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1084        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1085               size);
1086        FORMAT_DATA_OUT->waiting += size;
1087
1088        /*
1089         * If our output buffer has more than 16 Mebibytes in it, commit those
1090         * bytes and reset the waiting count to 0.
1091         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1092         * case there is still data in the buffer at program exit.
1093         */
1094        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1095                FORMAT_DATA_OUT->txbuffer =
1096                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1097                                                   FORMAT_DATA_OUT->dagstream,
1098                                                   FORMAT_DATA_OUT->waiting);
1099                FORMAT_DATA_OUT->waiting = 0;
1100        }
1101
1102        return size + pad + dag_record_size;
1103}
1104
1105/* Attempts to determine a suitable ERF type for a given packet. Returns true
1106 * if one is found, false otherwise */
1107static bool find_compatible_linktype(libtrace_out_t *libtrace,
1108                                     libtrace_packet_t *packet, char *type)
1109{
1110        /* Keep trying to simplify the packet until we can find
1111         * something we can do with it */
1112
1113        do {
1114                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1115
1116                /* Success */
1117                if (*type != (char)-1)
1118                        return true;
1119
1120                if (!demote_packet(packet)) {
1121                        trace_set_err_out(libtrace,
1122                                          TRACE_ERR_NO_CONVERSION,
1123                                          "No erf type for packet (%i)",
1124                                          trace_get_link_type(packet));
1125                        return false;
1126                }
1127
1128        } while(1);
1129
1130        return true;
1131}
1132
1133/* Writes a packet to the provided DAG output trace */
1134static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1135{
1136        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1137         * coding sucks, sorry about that.
1138         */
1139        unsigned int pad = 0;
1140        int numbytes;
1141        void *payload = packet->payload;
1142        dag_record_t *header = (dag_record_t *)packet->header;
1143        char erf_type = 0;
1144
1145        if(!packet->header) {
1146                /* No header, probably an RT packet. Lifted from
1147                 * erf_write_packet(). */
1148                return -1;
1149        }
1150
1151        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1152                return 0;
1153
1154        pad = dag_get_padding(packet);
1155
1156        /*
1157         * If the payload is null, adjust the rlen. Discussion of this is
1158         * attached to erf_write_packet()
1159         */
1160        if (payload == NULL) {
1161                header->rlen = htons(dag_record_size + pad);
1162        }
1163
1164        if (packet->type == TRACE_RT_DATA_ERF) {
1165                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1166        } else {
1167                /* Build up a new packet header from the existing header */
1168
1169                /* Simplify the packet first - if we can't do this, break
1170                 * early */
1171                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1172                        return -1;
1173
1174                dag_record_t erfhdr;
1175
1176                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1177                payload=packet->payload;
1178                pad = dag_get_padding(packet);
1179
1180                /* Flags. Can't do this */
1181                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1182                if (trace_get_direction(packet)!=(int)~0U)
1183                        erfhdr.flags.iface = trace_get_direction(packet);
1184
1185                erfhdr.type = erf_type;
1186
1187                /* Packet length (rlen includes format overhead) */
1188                assert(trace_get_capture_length(packet) > 0
1189                       && trace_get_capture_length(packet) <= 65536);
1190                assert(erf_get_framing_length(packet) > 0
1191                       && trace_get_framing_length(packet) <= 65536);
1192                assert(trace_get_capture_length(packet) +
1193                       erf_get_framing_length(packet) > 0
1194                       && trace_get_capture_length(packet) +
1195                       erf_get_framing_length(packet) <= 65536);
1196
1197                erfhdr.rlen = htons(trace_get_capture_length(packet)
1198                                    + erf_get_framing_length(packet));
1199
1200
1201                /* Loss counter. Can't do this */
1202                erfhdr.lctr = 0;
1203                /* Wire length, does not include padding! */
1204                erfhdr.wlen = htons(trace_get_wire_length(packet));
1205
1206                /* Write it out */
1207                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1208        }
1209
1210        return numbytes;
1211}
1212
1213/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1214 *
1215 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1216 */
1217static int dag_read_packet_real(libtrace_t *libtrace,
1218                                struct dag_per_stream_t *stream_data,
1219                                libtrace_thread_t *t, /* Optional */
1220                                libtrace_packet_t *packet)
1221{
1222        int size = 0;
1223        dag_record_t *erfptr = NULL;
1224        int numbytes = 0;
1225        uint32_t flags = 0;
1226        struct timeval maxwait, pollwait;
1227
1228        pollwait.tv_sec = 0;
1229        pollwait.tv_usec = 10000;
1230        maxwait.tv_sec = 0;
1231        maxwait.tv_usec = 250000;
1232
1233        /* Check if we're due for a DUCK report */
1234        size = dag_get_duckinfo(libtrace, packet);
1235
1236        if (size != 0)
1237                return size;
1238
1239        /* Don't let anyone try to free our DAG memory hole! */
1240        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1241
1242        /* If the packet buffer is currently owned by libtrace, free it so
1243         * that we can set the packet to point into the DAG memory hole */
1244        if (packet->buf_control == TRACE_CTRL_PACKET) {
1245                free(packet->buffer);
1246                packet->buffer = 0;
1247        }
1248
1249        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
1250                                sizeof(dag_record_t), &maxwait,
1251                                &pollwait) == -1) {
1252                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1253                return -1;
1254        }
1255
1256        /* Grab a full ERF record */
1257        do {
1258                numbytes = dag_available(libtrace, stream_data);
1259                if (numbytes < 0)
1260                        return numbytes;
1261                if (numbytes < dag_record_size) {
1262                        /* Check the message queue if we have one to check */
1263                        if (t != NULL &&
1264                            libtrace_message_queue_count(&t->messages) > 0)
1265                                return -2;
1266
1267                        if (libtrace_halt)
1268                                return 0;
1269                        /* Block until we see a packet */
1270                        continue;
1271                }
1272                erfptr = dag_get_record(stream_data);
1273        } while (erfptr == NULL);
1274
1275        packet->trace = libtrace;
1276        /* Prepare the libtrace packet */
1277        if (dag_prepare_packet_real(libtrace, stream_data, packet, erfptr,
1278                                    TRACE_RT_DATA_ERF, flags))
1279                return -1;
1280
1281        return packet->payload ? htons(erfptr->rlen) :
1282                erf_get_framing_length(packet);
1283}
1284
1285static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1286{
1287        return dag_read_packet_real(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1288}
1289
1290static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1291                             libtrace_packet_t **packets, size_t nb_packets)
1292{
1293        int ret;
1294        size_t read_packets = 0;
1295        int numbytes = 0;
1296
1297        struct dag_per_stream_t *stream_data =
1298                (struct dag_per_stream_t *)t->format_data;
1299
1300        /* Read as many packets as we can, but read atleast one packet */
1301        do {
1302                ret = dag_read_packet_real(libtrace, stream_data, t,
1303                                           packets[read_packets]);
1304                if (ret < 0)
1305                        return ret;
1306
1307                read_packets++;
1308
1309                /* Make sure we don't read too many packets..! */
1310                if (read_packets >= nb_packets)
1311                        break;
1312
1313                numbytes = dag_available(libtrace, stream_data);
1314        } while (numbytes >= dag_record_size);
1315
1316        return read_packets;
1317}
1318
1319/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1320 * packet is available, we will return a packet event. Otherwise we will
1321 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1322 */
1323static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1324                                           libtrace_packet_t *packet)
1325{
1326        libtrace_eventobj_t event = {0,0,0.0,0};
1327        dag_record_t *erfptr = NULL;
1328        int numbytes;
1329        uint32_t flags = 0;
1330        struct timeval minwait, tv;
1331       
1332        minwait.tv_sec = 0;
1333        minwait.tv_usec = 10000;
1334
1335        /* Check if we're meant to provide a DUCK update */
1336        numbytes = dag_get_duckinfo(libtrace, packet);
1337        if (numbytes < 0) {
1338                event.type = TRACE_EVENT_TERMINATE;
1339                return event;
1340        } else if (numbytes > 0) {
1341                event.type = TRACE_EVENT_PACKET;
1342                return event;
1343        }
1344       
1345        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
1346                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1347                                &minwait) == -1) {
1348                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1349                event.type = TRACE_EVENT_TERMINATE;
1350                return event;
1351        }
1352
1353        do {
1354                erfptr = NULL;
1355                numbytes = 0;
1356
1357                /* Need to call dag_available so that the top pointer will get
1358                 * updated, otherwise we'll never see any data! */
1359                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1360
1361                /* May as well not bother calling dag_get_record if
1362                 * dag_available suggests that there's no data */
1363                if (numbytes != 0)
1364                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1365                if (erfptr == NULL) {
1366                        /* No packet available - sleep for a very short time */
1367                        if (libtrace_halt) {
1368                                event.type = TRACE_EVENT_TERMINATE;
1369                        } else {
1370                                event.type = TRACE_EVENT_SLEEP;
1371                                event.seconds = 0.0001;
1372                        }
1373                        break;
1374                }
1375                if (dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
1376                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1377                        event.type = TRACE_EVENT_TERMINATE;
1378                        break;
1379                }
1380
1381
1382                event.size = trace_get_capture_length(packet) +
1383                        trace_get_framing_length(packet);
1384
1385                /* XXX trace_read_packet() normally applies the following
1386                 * config options for us, but this function is called via
1387                 * trace_event() so we have to do it ourselves */
1388
1389                if (libtrace->filter) {
1390                        int filtret = trace_apply_filter(libtrace->filter,
1391                                                         packet);
1392                        if (filtret == -1) {
1393                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1394                                              "Bad BPF Filter");
1395                                event.type = TRACE_EVENT_TERMINATE;
1396                                break;
1397                        }
1398
1399                        if (filtret == 0) {
1400                                /* This packet isn't useful so we want to
1401                                 * immediately see if there is another suitable
1402                                 * one - we definitely DO NOT want to return
1403                                 * a sleep event in this case, like we used to
1404                                 * do! */
1405                                libtrace->filtered_packets ++;
1406                                trace_clear_cache(packet);
1407                                continue;
1408                        }
1409
1410                        event.type = TRACE_EVENT_PACKET;
1411                } else {
1412                        event.type = TRACE_EVENT_PACKET;
1413                }
1414
1415                /* Update the DUCK timer */
1416                tv = trace_get_timeval(packet);
1417                DUCK.last_pkt = tv.tv_sec;
1418               
1419                if (libtrace->snaplen > 0) {
1420                        trace_set_capture_length(packet, libtrace->snaplen);
1421                }
1422                libtrace->accepted_packets ++;
1423                break;
1424        } while(1);
1425
1426        return event;
1427}
1428
1429/* Gets the number of dropped packets */
1430static uint64_t dag_get_dropped_packets(libtrace_t *libtrace)
1431{
1432        uint64_t sum = 0;
1433        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
1434
1435        /* Sum the drop counter for all the packets */
1436        while (tmp != NULL) {
1437                sum += STREAM_DATA(tmp)->drops;
1438                tmp = tmp->next;
1439        }
1440
1441        return sum;
1442}
1443
1444/* Prints some semi-useful help text about the DAG format module */
1445static void dag_help(void) {
1446        printf("dag format module: $Revision: 1755 $\n");
1447        printf("Supported input URIs:\n");
1448        printf("\tdag:/dev/dagn\n");
1449        printf("\n");
1450        printf("\te.g.: dag:/dev/dag0\n");
1451        printf("\n");
1452        printf("Supported output URIs:\n");
1453        printf("\tnone\n");
1454        printf("\n");
1455}
1456
1457static int dag_pconfig_input(UNUSED libtrace_t *libtrace,
1458                             trace_parallel_option_t option, UNUSED void *value)
1459{
1460        /* We don't support any of these! Normally you configure the DAG card
1461         * externally. */
1462        switch(option) {
1463        case TRACE_OPTION_SET_HASHER:
1464        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1465        case TRACE_OPTION_TRACETIME:
1466        case TRACE_OPTION_TICK_INTERVAL:
1467        case TRACE_OPTION_GET_CONFIG:
1468        case TRACE_OPTION_SET_CONFIG:
1469                return -1;
1470        }
1471        /* We don't provide a default option to ensure that future options will
1472         * generate a compiler warning. */
1473
1474        return -1;
1475}
1476
1477static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1478                                bool reader)
1479{
1480        struct dag_per_stream_t *stream_data;
1481
1482        if (reader) {
1483                if (t->type == THREAD_PERPKT) {
1484                        stream_data =
1485                                (struct dag_per_stream_t *)
1486                                libtrace_list_get_index(FORMAT_DATA->per_stream,
1487                                                        t->perpkt_num)->data;
1488
1489                        /* Pass the per thread data to the thread */
1490                        t->format_data = stream_data;
1491
1492                        /* Attach and start the DAG stream */
1493                        printf("t%u: starting and attaching stream #%u\n",
1494                               t->perpkt_num, stream_data->dagstream);
1495                        if (dag_start_input_stream(libtrace, stream_data) < 0)
1496                                return -1;
1497                } else {
1498                        /* TODO: Figure out why t->type != THREAD_PERPKT in
1499                         * order to figure out what this line does */
1500                        t->format_data = FORMAT_DATA_FIRST;
1501                }
1502        }
1503
1504        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
1505
1506        return 0;
1507}
1508
1509static struct libtrace_format_t dag = {
1510        "dag",
1511        "$Id$",
1512        TRACE_FORMAT_ERF,
1513        dag_probe_filename,             /* probe filename */
1514        NULL,                           /* probe magic */
1515        dag_init_input,                 /* init_input */
1516        dag_config_input,               /* config_input */
1517        dag_start_input,                /* start_input */
1518        dag_pause_input,                /* pause_input */
1519        dag_init_output,                /* init_output */
1520        NULL,                           /* config_output */
1521        dag_start_output,               /* start_output */
1522        dag_fin_input,                  /* fin_input */
1523        dag_fin_output,                 /* fin_output */
1524        dag_read_packet,                /* read_packet */
1525        dag_prepare_packet,             /* prepare_packet */
1526        NULL,                           /* fin_packet */
1527        dag_write_packet,               /* write_packet */
1528        erf_get_link_type,              /* get_link_type */
1529        erf_get_direction,              /* get_direction */
1530        erf_set_direction,              /* set_direction */
1531        erf_get_erf_timestamp,          /* get_erf_timestamp */
1532        NULL,                           /* get_timeval */
1533        NULL,                           /* get_seconds */
1534        NULL,                           /* get_timespec */
1535        NULL,                           /* seek_erf */
1536        NULL,                           /* seek_timeval */
1537        NULL,                           /* seek_seconds */
1538        erf_get_capture_length,         /* get_capture_length */
1539        erf_get_wire_length,            /* get_wire_length */
1540        erf_get_framing_length,         /* get_framing_length */
1541        erf_set_capture_length,         /* set_capture_length */
1542        NULL,                           /* get_received_packets */
1543        NULL,                           /* get_filtered_packets */
1544        dag_get_dropped_packets,        /* get_dropped_packets */
1545        NULL,                           /* get_statistics */
1546        NULL,                           /* get_fd */
1547        trace_event_dag,                /* trace_event */
1548        dag_help,                       /* help */
1549        NULL,                            /* next pointer */
1550        {true, 0}, /* live packet capture, thread limit TBD */
1551        dag_pstart_input,
1552        dag_pread_packets,
1553        dag_pause_input,
1554        NULL,
1555        dag_pconfig_input,
1556        dag_pregister_thread,
1557        NULL,
1558        NULL                            /* get thread stats */
1559};
1560
1561void dag_constructor(void)
1562{
1563        register_format(&dag);
1564}
Note: See TracBrowser for help on using the repository browser.