source: lib/format_dag25.c @ a6b44a7

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since a6b44a7 was a6b44a7, checked in by Shane Alcock <salcock@…>, 6 years ago

Don't fail trace_pstart if perpkt threads > DAG streams.

Instead, just cap the number of threads to match the number of streams we
have.

Added similar behaviour in the case where the URI contains more streams
than the number of threads.

  • Property mode set to 100644
File size: 44.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81#define DATA(x) ((struct dag_format_data_t *)x->format_data)
82#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89
90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
92
93static struct libtrace_format_t dag;
94
95/* A DAG device - a DAG device can support multiple streams (and therefore
96 * multiple input traces) so each trace needs to refer to a device */
97struct dag_dev_t {
98        char * dev_name;                /* Device name */
99        int fd;                         /* File descriptor */
100        uint16_t ref_count;             /* Number of input / output traces
101                                           that are using this device */
102        struct dag_dev_t *prev;         /* Pointer to the previous device in
103                                           the device list */
104        struct dag_dev_t *next;         /* Pointer to the next device in the
105                                           device list */
106};
107
108/* "Global" data that is stored for each DAG output trace */
109struct dag_format_data_out_t {
110        /* The DAG device being used for writing */
111        struct dag_dev_t *device;
112        /* The DAG stream that is being written on */
113        unsigned int dagstream;
114        /* Boolean flag indicating whether the stream is currently attached */
115        int stream_attached;
116        /* The amount of data waiting to be transmitted, in bytes */
117        uint64_t waiting;
118        /* A buffer to hold the data to be transmittted */
119        uint8_t *txbuffer;
120};
121
122/* Data that is stored against each input stream */
123struct dag_per_stream_t {
124        /* DAG stream number */
125        uint16_t dagstream;
126        /* Pointer to the last unread byte in the DAG memory */
127        uint8_t *top;
128        /* Pointer to the first unread byte in the DAG memory */
129        uint8_t *bottom;
130        /* Amount of data processed from the bottom pointer */
131        uint32_t processed;
132        /* Number of packets seen by the stream */
133        uint64_t pkt_count;
134        /* Drop count for this particular stream */
135        uint64_t drops;
136        /* Boolean values to indicate if a particular interface has been seen
137         * or not. This is limited to four interfaces, which is enough to
138         * support all current DAG cards */
139        uint8_t seeninterface[4];
140};
141
142/* "Global" data that is stored for each DAG input trace */
143struct dag_format_data_t {
144        /* DAG device */
145        struct dag_dev_t *device;
146        /* Boolean flag indicating whether the trace is currently attached */
147        int stream_attached;
148        /* Data stored against each DAG input stream */
149        libtrace_list_t *per_stream;
150
151        /* Data required for regular DUCK reporting.
152         * We put this on a new cache line otherwise we have a lot of false
153         * sharing caused by updating the last_pkt.
154         * This should only ever be accessed by the first thread stream,
155         * that includes both read and write operations.
156         */
157        struct {
158                /* Timestamp of the last DUCK report */
159                uint32_t last_duck;
160                /* The number of seconds between each DUCK report */
161                uint32_t duck_freq;
162                /* Timestamp of the last packet read from the DAG card */
163                uint32_t last_pkt;
164                /* Dummy trace to ensure DUCK packets are dealt with using the
165                 * DUCK format functions */
166                libtrace_t *dummy_duck;
167        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
168};
169
170/* To be thread-safe, we're going to need a mutex for operating on the list
171 * of DAG devices */
172pthread_mutex_t open_dag_mutex;
173
174/* The list of DAG devices that have been opened by libtrace.
175 *
176 * We can only open each DAG device once, but we might want to read from
177 * multiple streams. Therefore, we need to maintain a list of devices that we
178 * have opened (with ref counts!) so that we don't try to open a device too
179 * many times or close a device that we're still using */
180struct dag_dev_t *open_dags = NULL;
181
182/* Returns the amount of padding between the ERF header and the start of the
183 * captured packet data */
184static int dag_get_padding(const libtrace_packet_t *packet)
185{
186        /* ERF Ethernet records have a 2 byte padding before the packet itself
187         * so that the IP header is aligned on a 32 bit boundary.
188         */
189        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
190                dag_record_t *erfptr = (dag_record_t *)packet->header;
191                switch(erfptr->type) {
192                        case TYPE_ETH:
193                        case TYPE_DSM_COLOR_ETH:
194                                return 2;
195                        default:                return 0;
196                }
197        }
198        else {
199                switch(trace_get_link_type(packet)) {
200                        case TRACE_TYPE_ETH:    return 2;
201                        default:                return 0;
202                }
203        }
204}
205
206/* Attempts to determine if the given filename refers to a DAG device */
207static int dag_probe_filename(const char *filename)
208{
209        struct stat statbuf;
210        /* Can we stat the file? */
211        if (stat(filename, &statbuf) != 0) {
212                return 0;
213        }
214        /* Is it a character device? */
215        if (!S_ISCHR(statbuf.st_mode)) {
216                return 0;
217        }
218        /* Yeah, it's probably us. */
219        return 1;
220}
221
222/* Initialises the DAG output data structure */
223static void dag_init_format_out_data(libtrace_out_t *libtrace)
224{
225        libtrace->format_data = (struct dag_format_data_out_t *)
226                malloc(sizeof(struct dag_format_data_out_t));
227        // no DUCK on output
228        FORMAT_DATA_OUT->stream_attached = 0;
229        FORMAT_DATA_OUT->device = NULL;
230        FORMAT_DATA_OUT->dagstream = 0;
231        FORMAT_DATA_OUT->waiting = 0;
232
233}
234
235/* Initialises the DAG input data structure */
236static void dag_init_format_data(libtrace_t *libtrace)
237{
238        struct dag_per_stream_t stream_data;
239
240        libtrace->format_data = (struct dag_format_data_t *)
241                malloc(sizeof(struct dag_format_data_t));
242        DUCK.last_duck = 0;
243        DUCK.duck_freq = 0;
244        DUCK.last_pkt = 0;
245        DUCK.dummy_duck = NULL;
246
247        FORMAT_DATA->per_stream =
248                libtrace_list_init(sizeof(stream_data));
249        assert(FORMAT_DATA->per_stream != NULL);
250
251        /* We'll start with just one instance of stream_data, and we'll
252         * add more later if we need them */
253        memset(&stream_data, 0, sizeof(stream_data));
254        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
255}
256
257/* Determines if there is already an entry for the given DAG device in the
258 * device list and increments the reference count for that device, if found.
259 *
260 * NOTE: This function assumes the open_dag_mutex is held by the caller */
261static struct dag_dev_t *dag_find_open_device(char *dev_name)
262{
263        struct dag_dev_t *dag_dev;
264
265        dag_dev = open_dags;
266
267        /* XXX: Not exactly zippy, but how often are we going to be dealing
268         * with multiple dag cards? */
269        while (dag_dev != NULL) {
270                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
271                        dag_dev->ref_count ++;
272                        return dag_dev;
273                }
274                dag_dev = dag_dev->next;
275        }
276        return NULL;
277}
278
279/* Closes a DAG device and removes it from the device list.
280 *
281 * Attempting to close a DAG device that has a non-zero reference count will
282 * cause an assertion failure!
283 *
284 * NOTE: This function assumes the open_dag_mutex is held by the caller */
285static void dag_close_device(struct dag_dev_t *dev)
286{
287        /* Need to remove from the device list */
288        assert(dev->ref_count == 0);
289
290        if (dev->prev == NULL) {
291                open_dags = dev->next;
292                if (dev->next)
293                        dev->next->prev = NULL;
294        } else {
295                dev->prev->next = dev->next;
296                if (dev->next)
297                        dev->next->prev = dev->prev;
298        }
299
300        dag_close(dev->fd);
301        free(dev);
302}
303
304
305/* Opens a new DAG device for writing and adds it to the DAG device list
306 *
307 * NOTE: this function should only be called when opening a DAG device for
308 * writing - there is little practical difference between this and the
309 * function below that covers the reading case, but we need the output trace
310 * object to report errors properly so the two functions take slightly
311 * different arguments. This is really lame and there should be a much better
312 * way of doing this.
313 *
314 * NOTE: This function assumes the open_dag_mutex is held by the caller
315 */
316static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
317                                                char *dev_name)
318{
319        struct stat buf;
320        int fd;
321        struct dag_dev_t *new_dev;
322
323        /* Make sure the device exists */
324        if (stat(dev_name, &buf) == -1) {
325                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
326                return NULL;
327        }
328
329        /* Make sure it is the appropriate type of device */
330        if (S_ISCHR(buf.st_mode)) {
331                /* Try opening the DAG device */
332                if((fd = dag_open(dev_name)) < 0) {
333                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
334                                        dev_name);
335                        return NULL;
336                }
337        } else {
338                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
339                                dev_name);
340                return NULL;
341        }
342
343        /* Add the device to our device list - it is just a doubly linked
344         * list with no inherent ordering; just tack the new one on the front
345         */
346        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
347        new_dev->fd = fd;
348        new_dev->dev_name = dev_name;
349        new_dev->ref_count = 1;
350
351        new_dev->prev = NULL;
352        new_dev->next = open_dags;
353        if (open_dags)
354                open_dags->prev = new_dev;
355
356        open_dags = new_dev;
357
358        return new_dev;
359}
360
361/* Opens a new DAG device for reading and adds it to the DAG device list
362 *
363 * NOTE: this function should only be called when opening a DAG device for
364 * reading - there is little practical difference between this and the
365 * function above that covers the writing case, but we need the input trace
366 * object to report errors properly so the two functions take slightly
367 * different arguments. This is really lame and there should be a much better
368 * way of doing this.
369 *
370 * NOTE: This function assumes the open_dag_mutex is held by the caller */
371static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
372        struct stat buf;
373        int fd;
374        struct dag_dev_t *new_dev;
375
376        /* Make sure the device exists */
377        if (stat(dev_name, &buf) == -1) {
378                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
379                return NULL;
380        }
381
382        /* Make sure it is the appropriate type of device */
383        if (S_ISCHR(buf.st_mode)) {
384                /* Try opening the DAG device */
385                if((fd = dag_open(dev_name)) < 0) {
386                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
387                                      dev_name);
388                        return NULL;
389                }
390        } else {
391                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
392                              dev_name);
393                return NULL;
394        }
395
396        /* Add the device to our device list - it is just a doubly linked
397         * list with no inherent ordering; just tack the new one on the front
398         */
399        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
400        new_dev->fd = fd;
401        new_dev->dev_name = dev_name;
402        new_dev->ref_count = 1;
403
404        new_dev->prev = NULL;
405        new_dev->next = open_dags;
406        if (open_dags)
407                open_dags->prev = new_dev;
408
409        open_dags = new_dev;
410
411        return new_dev;
412}
413
414/* Creates and initialises a DAG output trace */
415static int dag_init_output(libtrace_out_t *libtrace)
416{
417        /* Upon successful creation, the device name is stored against the
418         * device and free when it is free()d */
419        char *dag_dev_name = NULL;
420        char *scan = NULL;
421        struct dag_dev_t *dag_device = NULL;
422        int stream = 1;
423
424        /* XXX I don't know if this is important or not, but this function
425         * isn't present in all of the driver releases that this code is
426         * supposed to support! */
427        /*
428        unsigned long wake_time;
429        dagutil_sleep_get_wake_time(&wake_time,0);
430        */
431
432        dag_init_format_out_data(libtrace);
433        /* Grab the mutex while we're likely to be messing with the device
434         * list */
435        pthread_mutex_lock(&open_dag_mutex);
436
437        /* Specific streams are signified using a comma in the libtrace URI,
438         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
439         *
440         * If no stream is specified, we will write using stream 1 */
441        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
442                dag_dev_name = strdup(libtrace->uridata);
443        } else {
444                dag_dev_name = (char *)strndup(libtrace->uridata,
445                                (size_t)(scan - libtrace->uridata));
446                stream = atoi(++scan);
447        }
448        FORMAT_DATA_OUT->dagstream = stream;
449
450        /* See if our DAG device is already open */
451        dag_device = dag_find_open_device(dag_dev_name);
452
453        if (dag_device == NULL) {
454                /* Device not yet opened - open it ourselves */
455                dag_device = dag_open_output_device(libtrace, dag_dev_name);
456        } else {
457                /* Otherwise, just use the existing one */
458                free(dag_dev_name);
459                dag_dev_name = NULL;
460        }
461
462        /* Make sure we have successfully opened a DAG device */
463        if (dag_device == NULL) {
464                if (dag_dev_name) {
465                        free(dag_dev_name);
466                }
467                pthread_mutex_unlock(&open_dag_mutex);
468                return -1;
469        }
470
471        FORMAT_DATA_OUT->device = dag_device;
472        pthread_mutex_unlock(&open_dag_mutex);
473        return 0;
474}
475
476/* Creates and initialises a DAG input trace */
477static int dag_init_input(libtrace_t *libtrace) {
478        /* Upon successful creation, the device name is stored against the
479         * device and free when it is free()d */
480        char *dag_dev_name = NULL;
481        char *scan = NULL;
482        int stream = 0;
483        struct dag_dev_t *dag_device = NULL;
484
485        dag_init_format_data(libtrace);
486        /* Grab the mutex while we're likely to be messing with the device
487         * list */
488        pthread_mutex_lock(&open_dag_mutex);
489
490
491        /* DAG cards support multiple streams. In a single threaded capture,
492         * these are specified using a comma in the libtrace URI,
493         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
494         *
495         * If no stream is specified, we will read from stream 0 with
496         * one thread
497         */
498        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
499                dag_dev_name = strdup(libtrace->uridata);
500        } else {
501                dag_dev_name = (char *)strndup(libtrace->uridata,
502                                (size_t)(scan - libtrace->uridata));
503                stream = atoi(++scan);
504        }
505
506        FORMAT_DATA_FIRST->dagstream = stream;
507
508        /* See if our DAG device is already open */
509        dag_device = dag_find_open_device(dag_dev_name);
510
511        if (dag_device == NULL) {
512                /* Device not yet opened - open it ourselves */
513                dag_device = dag_open_device(libtrace, dag_dev_name);
514        } else {
515                /* Otherwise, just use the existing one */
516                free(dag_dev_name);
517                dag_dev_name = NULL;
518        }
519
520        /* Make sure we have successfully opened a DAG device */
521        if (dag_device == NULL) {
522                if (dag_dev_name)
523                        free(dag_dev_name);
524                dag_dev_name = NULL;
525                pthread_mutex_unlock(&open_dag_mutex);
526                return -1;
527        }
528
529        FORMAT_DATA->device = dag_device;
530
531        /* See Config_Status_API_Programming_Guide.pdf from the Endace
532           Dag Documentation */
533        /* Check kBooleanAttributeActive is true -- no point capturing
534         * on an interface that's disabled
535         *
536         * The symptom of the port being disabled is that libtrace
537         * will appear to hang. */
538        /* Check kBooleanAttributeFault is false */
539        /* Check kBooleanAttributeLocalFault is false */
540        /* Check kBooleanAttributeLock is true ? */
541        /* Check kBooleanAttributePeerLink ? */
542
543        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
544           on libtrace promisc attribute?*/
545        /* Set kUint32AttributeSnapLength to the snaplength */
546
547        pthread_mutex_unlock(&open_dag_mutex);
548        return 0;
549}
550
551/* Configures a DAG input trace */
552static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
553                            void *data)
554{
555        char conf_str[4096];
556        switch(option) {
557        case TRACE_OPTION_META_FREQ:
558                /* This option is used to specify the frequency of DUCK
559                 * updates */
560                DUCK.duck_freq = *(int *)data;
561                return 0;
562        case TRACE_OPTION_SNAPLEN:
563                /* Tell the card our new snap length */
564                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
565                if (dag_configure(FORMAT_DATA->device->fd,
566                                  conf_str) != 0) {
567                        trace_set_err(libtrace, errno, "Failed to configure "
568                                      "snaplen on DAG card: %s",
569                                      libtrace->uridata);
570                        return -1;
571                }
572                return 0;
573        case TRACE_OPTION_PROMISC:
574                /* DAG already operates in a promisc fashion */
575                return -1;
576        case TRACE_OPTION_FILTER:
577                /* We don't yet support pushing filters into DAG
578                 * cards */
579                return -1;
580        case TRACE_OPTION_EVENT_REALTIME:
581                /* Live capture is always going to be realtime */
582                return -1;
583        case TRACE_OPTION_HASHER:
584                /* Lets just say we did this, it's currently still up to
585                 * the user to configure this correctly. */
586                return 0;
587        }
588        return -1;
589}
590
591/* Starts a DAG output trace */
592static int dag_start_output(libtrace_out_t *libtrace)
593{
594        struct timeval zero, nopoll;
595
596        zero.tv_sec = 0;
597        zero.tv_usec = 0;
598        nopoll = zero;
599
600        /* Attach and start the DAG stream */
601        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
602                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
603                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
604                return -1;
605        }
606
607        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
608                        FORMAT_DATA_OUT->dagstream) < 0) {
609                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
610                return -1;
611        }
612        FORMAT_DATA_OUT->stream_attached = 1;
613
614        /* We don't want the dag card to do any sleeping */
615        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
616                        FORMAT_DATA_OUT->dagstream, 0, &zero,
617                        &nopoll);
618
619        return 0;
620}
621
622static int dag_start_input_stream(libtrace_t *libtrace,
623                                  struct dag_per_stream_t * stream) {
624        struct timeval zero, nopoll;
625        uint8_t *top, *bottom, *starttop;
626        top = bottom = NULL;
627
628        zero.tv_sec = 0;
629        zero.tv_usec = 10000;
630        nopoll = zero;
631
632        /* Attach and start the DAG stream */
633        if (dag_attach_stream(FORMAT_DATA->device->fd,
634                              stream->dagstream, 0, 0) < 0) {
635                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
636                              stream->dagstream);
637                return -1;
638        }
639
640        if (dag_start_stream(FORMAT_DATA->device->fd,
641                             stream->dagstream) < 0) {
642                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
643                              stream->dagstream);
644                return -1;
645        }
646        FORMAT_DATA->stream_attached = 1;
647
648        /* We don't want the dag card to do any sleeping */
649        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
650                            stream->dagstream, 0, &zero,
651                            &nopoll) < 0) {
652                trace_set_err(libtrace, errno,
653                              "dag_set_stream_poll failed!");
654                return -1;
655        }
656
657        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
658                                      stream->dagstream,
659                                      &bottom);
660
661        /* Should probably flush the memory hole now */
662        top = starttop;
663        while (starttop - bottom > 0) {
664                bottom += (starttop - bottom);
665                top = dag_advance_stream(FORMAT_DATA->device->fd,
666                                         stream->dagstream,
667                                         &bottom);
668        }
669        stream->top = top;
670        stream->bottom = bottom;
671        stream->processed = 0;
672        stream->drops = 0;
673
674        return 0;
675
676}
677
678/* Starts a DAG input trace */
679static int dag_start_input(libtrace_t *libtrace)
680{
681        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
682}
683
684static int dag_pstart_input(libtrace_t *libtrace)
685{
686        char *scan, *tok;
687        uint16_t stream_count = 0, max_streams;
688        int iserror = 0;
689        struct dag_per_stream_t stream_data;
690
691        /* Check we aren't trying to create more threads than the DAG card can
692         * handle */
693        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
694        if (libtrace->perpkt_thread_count > max_streams) {
695                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
696                              "WARNING: DAG has only %u streams available, "
697                              "capping total number of threads at this value.",
698                              max_streams);
699                libtrace->perpkt_thread_count = max_streams;
700        }
701
702        /* Get the stream names from the uri */
703        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
704                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
705                              "Format uri doesn't specify the DAG streams");
706                iserror = 1;
707                goto cleanup;
708        }
709
710        scan++;
711
712        tok = strtok(scan, ",");
713        while (tok != NULL) {
714                /* Ensure we haven't specified too many streams */
715                if (stream_count >= libtrace->perpkt_thread_count) {
716                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
717                                      "WARNING: Format uri specifies too many "
718                                      "streams. Maximum is %u, so only using "
719                                      "the first %u from the uri.",
720                                      max_streams);
721                        break;
722                }
723
724                /* Save the stream details */
725                if (stream_count == 0) {
726                        /* Special case where we update the existing stream
727                         * data structure */
728                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
729                } else {
730                        memset(&stream_data, 0, sizeof(stream_data));
731                        stream_data.dagstream = (uint16_t)atoi(tok);
732                        libtrace_list_push_back(FORMAT_DATA->per_stream,
733                                                &stream_data);
734                }
735
736                stream_count++;
737                tok = strtok(NULL, ",");
738        }
739
740        FORMAT_DATA->stream_attached = 1;
741
742 cleanup:
743        if (iserror) {
744                return -1;
745        } else {
746                return 0;
747        }
748}
749
750/* Pauses a DAG output trace */
751static int dag_pause_output(libtrace_out_t *libtrace)
752{
753        /* Stop and detach the stream */
754        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
755                            FORMAT_DATA_OUT->dagstream) < 0) {
756                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
757                return -1;
758        }
759        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
760                              FORMAT_DATA_OUT->dagstream) < 0) {
761                trace_set_err_out(libtrace, errno,
762                                  "Could not detach DAG stream");
763                return -1;
764        }
765        FORMAT_DATA_OUT->stream_attached = 0;
766        return 0;
767}
768
769/* Pauses a DAG input trace */
770static int dag_pause_input(libtrace_t *libtrace)
771{
772        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
773
774        /* Stop and detach each stream */
775        while (tmp != NULL) {
776                if (dag_stop_stream(FORMAT_DATA->device->fd,
777                                    STREAM_DATA(tmp)->dagstream) < 0) {
778                        trace_set_err(libtrace, errno,
779                                      "Could not stop DAG stream");
780                        printf("Count not stop DAG stream\n");
781                        return -1;
782                }
783                if (dag_detach_stream(FORMAT_DATA->device->fd,
784                                      STREAM_DATA(tmp)->dagstream) < 0) {
785                        trace_set_err(libtrace, errno,
786                                      "Could not detach DAG stream");
787                        printf("Count not detach DAG stream\n");
788                        return -1;
789                }
790
791                tmp = tmp->next;
792        }
793
794        FORMAT_DATA->stream_attached = 0;
795        return 0;
796}
797
798
799
800/* Closes a DAG input trace */
801static int dag_fin_input(libtrace_t *libtrace)
802{
803        /* Need the lock, since we're going to be handling the device list */
804        pthread_mutex_lock(&open_dag_mutex);
805
806        /* Detach the stream if we are not paused */
807        if (FORMAT_DATA->stream_attached)
808                dag_pause_input(libtrace);
809        FORMAT_DATA->device->ref_count--;
810
811        /* Close the DAG device if there are no more references to it */
812        if (FORMAT_DATA->device->ref_count == 0)
813                dag_close_device(FORMAT_DATA->device);
814
815        if (DUCK.dummy_duck)
816                trace_destroy_dead(DUCK.dummy_duck);
817
818        /* Clear the list */
819        libtrace_list_deinit(FORMAT_DATA->per_stream);
820        free(libtrace->format_data);
821        pthread_mutex_unlock(&open_dag_mutex);
822        return 0; /* success */
823}
824
825/* Closes a DAG output trace */
826static int dag_fin_output(libtrace_out_t *libtrace)
827{
828
829        /* Commit any outstanding traffic in the txbuffer */
830        if (FORMAT_DATA_OUT->waiting) {
831                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
832                                           FORMAT_DATA_OUT->dagstream,
833                                           FORMAT_DATA_OUT->waiting );
834        }
835
836        /* Wait until the buffer is nearly clear before exiting the program,
837         * as we will lose packets otherwise */
838        dag_tx_get_stream_space
839                (FORMAT_DATA_OUT->device->fd,
840                 FORMAT_DATA_OUT->dagstream,
841                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
842                                            FORMAT_DATA_OUT->dagstream) - 8);
843
844        /* Need the lock, since we're going to be handling the device list */
845        pthread_mutex_lock(&open_dag_mutex);
846
847        /* Detach the stream if we are not paused */
848        if (FORMAT_DATA_OUT->stream_attached)
849                dag_pause_output(libtrace);
850        FORMAT_DATA_OUT->device->ref_count --;
851
852        /* Close the DAG device if there are no more references to it */
853        if (FORMAT_DATA_OUT->device->ref_count == 0)
854                dag_close_device(FORMAT_DATA_OUT->device);
855        free(libtrace->format_data);
856        pthread_mutex_unlock(&open_dag_mutex);
857        return 0; /* success */
858}
859
860#ifdef DAGIOC_CARD_DUCK
861#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
862#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
863#else
864#ifdef DAGIOCDUCK
865#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
866#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
867#else
868#warning "DAG appears to be missing DUCK support"
869#endif
870#endif
871
872/* Extracts DUCK information from the DAG card and produces a DUCK packet */
873static int dag_get_duckinfo(libtrace_t *libtrace,
874                                libtrace_packet_t *packet) {
875
876        if (DUCK.duck_freq == 0)
877                return 0;
878
879#ifndef LIBTRACE_DUCK_IOCTL
880        trace_set_err(libtrace, errno, 
881                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
882        DUCK.duck_freq = 0;
883        return -1;
884#endif
885
886        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
887                return 0;
888
889        /* Allocate memory for the DUCK data */
890        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
891            !packet->buffer) {
892                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
893                packet->buf_control = TRACE_CTRL_PACKET;
894                if (!packet->buffer) {
895                        trace_set_err(libtrace, errno,
896                                      "Cannot allocate packet buffer");
897                        return -1;
898                }
899        }
900
901        /* DUCK doesn't have a format header */
902        packet->header = 0;
903        packet->payload = packet->buffer;
904
905        /* No need to check if we can get DUCK or not - we're modern
906         * enough so just grab the DUCK info */
907        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
908                   (duckinf_t *)packet->payload) < 0)) {
909                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
910                DUCK.duck_freq = 0;
911                return -1;
912        }
913
914        packet->type = LIBTRACE_DUCK_VERSION;
915
916        /* Set the packet's trace to point at a DUCK trace, so that the
917         * DUCK format functions will be called on the packet rather than the
918         * DAG ones */
919        if (!DUCK.dummy_duck)
920                DUCK.dummy_duck = trace_create_dead("duck:dummy");
921        packet->trace = DUCK.dummy_duck;
922        DUCK.last_duck = DUCK.last_pkt;
923        packet->error = sizeof(duckinf_t);
924        return sizeof(duckinf_t);
925}
926
927/* Determines the amount of data available to read from the DAG card */
928static int dag_available(libtrace_t *libtrace,
929                         struct dag_per_stream_t *stream_data)
930{
931        uint32_t diff = stream_data->top - stream_data->bottom;
932
933        /* If we've processed more than 4MB of data since we last called
934         * dag_advance_stream, then we should call it again to allow the
935         * space occupied by that 4MB to be released */
936        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
937                return diff;
938
939        /* Update the top and bottom pointers */
940        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
941                                              stream_data->dagstream,
942                                              &(stream_data->bottom));
943
944        if (stream_data->top == NULL) {
945                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
946                return -1;
947        }
948        stream_data->processed = 0;
949        diff = stream_data->top - stream_data->bottom;
950        return diff;
951}
952
953/* Returns a pointer to the start of the next complete ERF record */
954static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
955{
956        dag_record_t *erfptr = NULL;
957        uint16_t size;
958
959        erfptr = (dag_record_t *)stream_data->bottom;
960        if (!erfptr)
961                return NULL;
962
963        size = ntohs(erfptr->rlen);
964        assert( size >= dag_record_size );
965
966        /* Make certain we have the full packet available */
967        if (size > (stream_data->top - stream_data->bottom))
968                return NULL;
969
970        stream_data->bottom += size;
971        stream_data->processed += size;
972        return erfptr;
973}
974
975/* Converts a buffer containing a recently read DAG packet record into a
976 * libtrace packet */
977static int dag_prepare_packet_stream(libtrace_t *libtrace,
978                                     struct dag_per_stream_t *stream_data,
979                                     libtrace_packet_t *packet,
980                                     void *buffer, libtrace_rt_types_t rt_type,
981                                     uint32_t flags)
982{
983        dag_record_t *erfptr;
984
985        /* If the packet previously owned a buffer that is not the buffer
986         * that contains the new packet data, we're going to need to free the
987         * old one to avoid memory leaks */
988        if (packet->buffer != buffer &&
989            packet->buf_control == TRACE_CTRL_PACKET) {
990                free(packet->buffer);
991        }
992
993        /* Set the buffer owner appropriately */
994        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
995                packet->buf_control = TRACE_CTRL_PACKET;
996        } else
997                packet->buf_control = TRACE_CTRL_EXTERNAL;
998
999        /* Update the packet pointers and type appropriately */
1000        erfptr = (dag_record_t *)buffer;
1001        packet->buffer = erfptr;
1002        packet->header = erfptr;
1003        packet->type = rt_type;
1004
1005        if (erfptr->flags.rxerror == 1) {
1006                /* rxerror means the payload is corrupt - drop the payload
1007                 * by tweaking rlen */
1008                packet->payload = NULL;
1009                erfptr->rlen = htons(erf_get_framing_length(packet));
1010        } else {
1011                packet->payload = (char*)packet->buffer
1012                        + erf_get_framing_length(packet);
1013        }
1014
1015        if (libtrace->format_data == NULL) {
1016                dag_init_format_data(libtrace);
1017        }
1018
1019        /* Update the dropped packets counter */
1020        /* No loss counter for DSM coloured records - have to use some
1021         * other API */
1022        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
1023                /* TODO */
1024        } else {
1025                /* Use the ERF loss counter */
1026                if (stream_data->seeninterface[erfptr->flags.iface]
1027                    == 0) {
1028                        stream_data->seeninterface[erfptr->flags.iface]
1029                                = 1;
1030                } else {
1031                        stream_data->drops += ntohs(erfptr->lctr);
1032                }
1033        }
1034
1035        return 0;
1036}
1037
1038static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1039                              void *buffer, libtrace_rt_types_t rt_type,
1040                              uint32_t flags)
1041{
1042        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1043                                       buffer, rt_type, flags);
1044}
1045
1046/*
1047 * dag_write_packet() at this stage attempts to improve tx performance
1048 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1049 * has been met. I observed approximately 270% performance increase
1050 * through this relatively naive tweak. No optimisation of buffer sizes
1051 * was attempted.
1052 */
1053
1054/* Pushes an ERF record onto the transmit stream */
1055static int dag_dump_packet(libtrace_out_t *libtrace,
1056                           dag_record_t *erfptr, unsigned int pad,
1057                           void *buffer)
1058{
1059        int size;
1060
1061        /*
1062         * If we've got 0 bytes waiting in the txqueue, assume that we
1063         * haven't requested any space yet, and request some, storing
1064         * the pointer at FORMAT_DATA_OUT->txbuffer.
1065         *
1066         * The amount to request is slightly magical at the moment - it's
1067         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1068         * the buffer and handle overruns.
1069         */
1070        if (FORMAT_DATA_OUT->waiting == 0) {
1071                FORMAT_DATA_OUT->txbuffer =
1072                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1073                                                FORMAT_DATA_OUT->dagstream,
1074                                                16908288);
1075        }
1076
1077        /*
1078         * Copy the header separately to the body, as we can't guarantee they
1079         * are in contiguous memory
1080         */
1081        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1082               (dag_record_size + pad));
1083        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1084
1085        /*
1086         * Copy our incoming packet into the outgoing buffer, and increment
1087         * our waiting count
1088         */
1089        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1090        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1091               size);
1092        FORMAT_DATA_OUT->waiting += size;
1093
1094        /*
1095         * If our output buffer has more than 16 Mebibytes in it, commit those
1096         * bytes and reset the waiting count to 0.
1097         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1098         * case there is still data in the buffer at program exit.
1099         */
1100        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1101                FORMAT_DATA_OUT->txbuffer =
1102                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1103                                                   FORMAT_DATA_OUT->dagstream,
1104                                                   FORMAT_DATA_OUT->waiting);
1105                FORMAT_DATA_OUT->waiting = 0;
1106        }
1107
1108        return size + pad + dag_record_size;
1109}
1110
1111/* Attempts to determine a suitable ERF type for a given packet. Returns true
1112 * if one is found, false otherwise */
1113static bool find_compatible_linktype(libtrace_out_t *libtrace,
1114                                     libtrace_packet_t *packet, char *type)
1115{
1116        /* Keep trying to simplify the packet until we can find
1117         * something we can do with it */
1118
1119        do {
1120                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1121
1122                /* Success */
1123                if (*type != (char)-1)
1124                        return true;
1125
1126                if (!demote_packet(packet)) {
1127                        trace_set_err_out(libtrace,
1128                                          TRACE_ERR_NO_CONVERSION,
1129                                          "No erf type for packet (%i)",
1130                                          trace_get_link_type(packet));
1131                        return false;
1132                }
1133
1134        } while(1);
1135
1136        return true;
1137}
1138
1139/* Writes a packet to the provided DAG output trace */
1140static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1141{
1142        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1143         * coding sucks, sorry about that.
1144         */
1145        unsigned int pad = 0;
1146        int numbytes;
1147        void *payload = packet->payload;
1148        dag_record_t *header = (dag_record_t *)packet->header;
1149        char erf_type = 0;
1150
1151        if(!packet->header) {
1152                /* No header, probably an RT packet. Lifted from
1153                 * erf_write_packet(). */
1154                return -1;
1155        }
1156
1157        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1158                return 0;
1159
1160        pad = dag_get_padding(packet);
1161
1162        /*
1163         * If the payload is null, adjust the rlen. Discussion of this is
1164         * attached to erf_write_packet()
1165         */
1166        if (payload == NULL) {
1167                header->rlen = htons(dag_record_size + pad);
1168        }
1169
1170        if (packet->type == TRACE_RT_DATA_ERF) {
1171                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1172        } else {
1173                /* Build up a new packet header from the existing header */
1174
1175                /* Simplify the packet first - if we can't do this, break
1176                 * early */
1177                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1178                        return -1;
1179
1180                dag_record_t erfhdr;
1181
1182                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1183                payload=packet->payload;
1184                pad = dag_get_padding(packet);
1185
1186                /* Flags. Can't do this */
1187                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1188                if (trace_get_direction(packet)!=(int)~0U)
1189                        erfhdr.flags.iface = trace_get_direction(packet);
1190
1191                erfhdr.type = erf_type;
1192
1193                /* Packet length (rlen includes format overhead) */
1194                assert(trace_get_capture_length(packet) > 0
1195                       && trace_get_capture_length(packet) <= 65536);
1196                assert(erf_get_framing_length(packet) > 0
1197                       && trace_get_framing_length(packet) <= 65536);
1198                assert(trace_get_capture_length(packet) +
1199                       erf_get_framing_length(packet) > 0
1200                       && trace_get_capture_length(packet) +
1201                       erf_get_framing_length(packet) <= 65536);
1202
1203                erfhdr.rlen = htons(trace_get_capture_length(packet)
1204                                    + erf_get_framing_length(packet));
1205
1206
1207                /* Loss counter. Can't do this */
1208                erfhdr.lctr = 0;
1209                /* Wire length, does not include padding! */
1210                erfhdr.wlen = htons(trace_get_wire_length(packet));
1211
1212                /* Write it out */
1213                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1214        }
1215
1216        return numbytes;
1217}
1218
1219/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1220 *
1221 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1222 */
1223static int dag_read_packet_stream(libtrace_t *libtrace,
1224                                struct dag_per_stream_t *stream_data,
1225                                libtrace_thread_t *t, /* Optional */
1226                                libtrace_packet_t *packet)
1227{
1228        int size = 0;
1229        dag_record_t *erfptr = NULL;
1230        struct timeval tv;
1231        int numbytes = 0;
1232        uint32_t flags = 0;
1233        struct timeval maxwait, pollwait;
1234
1235        pollwait.tv_sec = 0;
1236        pollwait.tv_usec = 10000;
1237        maxwait.tv_sec = 0;
1238        maxwait.tv_usec = 250000;
1239
1240        /* Check if we're due for a DUCK report - only report on the first thread */
1241        if (stream_data == FORMAT_DATA_FIRST) {
1242                size = dag_get_duckinfo(libtrace, packet);
1243                if (size != 0)
1244                        return size;
1245        }
1246
1247
1248        /* Don't let anyone try to free our DAG memory hole! */
1249        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1250
1251        /* If the packet buffer is currently owned by libtrace, free it so
1252         * that we can set the packet to point into the DAG memory hole */
1253        if (packet->buf_control == TRACE_CTRL_PACKET) {
1254                free(packet->buffer);
1255                packet->buffer = 0;
1256        }
1257
1258        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
1259                                sizeof(dag_record_t), &maxwait,
1260                                &pollwait) == -1) {
1261                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1262                return -1;
1263        }
1264
1265        /* Grab a full ERF record */
1266        do {
1267                numbytes = dag_available(libtrace, stream_data);
1268                if (numbytes < 0)
1269                        return numbytes;
1270                if (numbytes < dag_record_size) {
1271                        /* Check the message queue if we have one to check */
1272                        if (t != NULL &&
1273                            libtrace_message_queue_count(&t->messages) > 0)
1274                                return -2;
1275
1276                        if (libtrace_halt)
1277                                return 0;
1278                        /* Block until we see a packet */
1279                        continue;
1280                }
1281                erfptr = dag_get_record(stream_data);
1282        } while (erfptr == NULL);
1283
1284        packet->trace = libtrace;
1285        /* Prepare the libtrace packet */
1286        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1287                                    TRACE_RT_DATA_ERF, flags))
1288                return -1;
1289
1290        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1291        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1292                tv = trace_get_timeval(packet);
1293                DUCK.last_pkt = tv.tv_sec;
1294        }
1295
1296        packet->error = packet->payload ? htons(erfptr->rlen) :
1297                                          erf_get_framing_length(packet);
1298
1299        return packet->error;
1300}
1301
1302static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1303{
1304        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1305}
1306
1307static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1308                             libtrace_packet_t **packets, size_t nb_packets)
1309{
1310        int ret;
1311        size_t read_packets = 0;
1312        int numbytes = 0;
1313
1314        struct dag_per_stream_t *stream_data =
1315                (struct dag_per_stream_t *)t->format_data;
1316
1317        /* Read as many packets as we can, but read atleast one packet */
1318        do {
1319                ret = dag_read_packet_stream(libtrace, stream_data, t,
1320                                           packets[read_packets]);
1321                if (ret < 0)
1322                        return ret;
1323
1324                read_packets++;
1325
1326                /* Make sure we don't read too many packets..! */
1327                if (read_packets >= nb_packets)
1328                        break;
1329
1330                numbytes = dag_available(libtrace, stream_data);
1331        } while (numbytes >= dag_record_size);
1332
1333        return read_packets;
1334}
1335
1336/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1337 * packet is available, we will return a packet event. Otherwise we will
1338 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1339 */
1340static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1341                                           libtrace_packet_t *packet)
1342{
1343        libtrace_eventobj_t event = {0,0,0.0,0};
1344        dag_record_t *erfptr = NULL;
1345        int numbytes;
1346        uint32_t flags = 0;
1347        struct timeval minwait, tv;
1348       
1349        minwait.tv_sec = 0;
1350        minwait.tv_usec = 10000;
1351
1352        /* Check if we're meant to provide a DUCK update */
1353        numbytes = dag_get_duckinfo(libtrace, packet);
1354        if (numbytes < 0) {
1355                event.type = TRACE_EVENT_TERMINATE;
1356                return event;
1357        } else if (numbytes > 0) {
1358                event.type = TRACE_EVENT_PACKET;
1359                return event;
1360        }
1361       
1362        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
1363                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1364                                &minwait) == -1) {
1365                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1366                event.type = TRACE_EVENT_TERMINATE;
1367                return event;
1368        }
1369
1370        do {
1371                erfptr = NULL;
1372                numbytes = 0;
1373
1374                /* Need to call dag_available so that the top pointer will get
1375                 * updated, otherwise we'll never see any data! */
1376                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1377
1378                /* May as well not bother calling dag_get_record if
1379                 * dag_available suggests that there's no data */
1380                if (numbytes != 0)
1381                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1382                if (erfptr == NULL) {
1383                        /* No packet available - sleep for a very short time */
1384                        if (libtrace_halt) {
1385                                event.type = TRACE_EVENT_TERMINATE;
1386                        } else {
1387                                event.type = TRACE_EVENT_SLEEP;
1388                                event.seconds = 0.0001;
1389                        }
1390                        break;
1391                }
1392                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1393                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1394                        event.type = TRACE_EVENT_TERMINATE;
1395                        break;
1396                }
1397
1398
1399                event.size = trace_get_capture_length(packet) +
1400                        trace_get_framing_length(packet);
1401
1402                /* XXX trace_read_packet() normally applies the following
1403                 * config options for us, but this function is called via
1404                 * trace_event() so we have to do it ourselves */
1405
1406                if (libtrace->filter) {
1407                        int filtret = trace_apply_filter(libtrace->filter,
1408                                                         packet);
1409                        if (filtret == -1) {
1410                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1411                                              "Bad BPF Filter");
1412                                event.type = TRACE_EVENT_TERMINATE;
1413                                break;
1414                        }
1415
1416                        if (filtret == 0) {
1417                                /* This packet isn't useful so we want to
1418                                 * immediately see if there is another suitable
1419                                 * one - we definitely DO NOT want to return
1420                                 * a sleep event in this case, like we used to
1421                                 * do! */
1422                                libtrace->filtered_packets ++;
1423                                trace_clear_cache(packet);
1424                                continue;
1425                        }
1426
1427                        event.type = TRACE_EVENT_PACKET;
1428                } else {
1429                        event.type = TRACE_EVENT_PACKET;
1430                }
1431
1432                /* Update the DUCK timer */
1433                tv = trace_get_timeval(packet);
1434                DUCK.last_pkt = tv.tv_sec;
1435               
1436                if (libtrace->snaplen > 0) {
1437                        trace_set_capture_length(packet, libtrace->snaplen);
1438                }
1439                libtrace->accepted_packets ++;
1440                break;
1441        } while(1);
1442
1443        return event;
1444}
1445
1446static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1447{
1448        libtrace_list_node_t *tmp;
1449        assert(stat && libtrace);
1450        tmp = FORMAT_DATA_HEAD;
1451
1452        /* Dropped packets */
1453        stat->dropped_valid = 1;
1454        stat->dropped = 0;
1455        while (tmp != NULL) {
1456                stat->dropped += STREAM_DATA(tmp)->drops;
1457                tmp = tmp->next;
1458        }
1459
1460}
1461
1462static void dag_get_thread_statisitics(libtrace_t *libtrace, libtrace_thread_t *t,
1463                                       libtrace_stat_t *stat) {
1464        struct dag_per_stream_t *stream_data = t->format_data;
1465        assert(stat && libtrace);
1466
1467        stat->dropped_valid = 1;
1468        stat->dropped = stream_data->drops;
1469
1470        stat->filtered_valid = 1;
1471        stat->filtered = 0;
1472}
1473
1474/* Prints some semi-useful help text about the DAG format module */
1475static void dag_help(void) {
1476        printf("dag format module: $Revision: 1755 $\n");
1477        printf("Supported input URIs:\n");
1478        printf("\tdag:/dev/dagn\n");
1479        printf("\n");
1480        printf("\te.g.: dag:/dev/dag0\n");
1481        printf("\n");
1482        printf("Supported output URIs:\n");
1483        printf("\tnone\n");
1484        printf("\n");
1485}
1486
1487static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1488                                bool reader)
1489{
1490        struct dag_per_stream_t *stream_data;
1491        libtrace_list_node_t *node;
1492
1493        if (reader && t->type == THREAD_PERPKT) {
1494                fprintf(stderr, "t%u: registered reader thread", t->perpkt_num);
1495                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1496                                                t->perpkt_num);
1497                if (node == NULL) {
1498                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1499                                      "Too few streams supplied for the"
1500                                      " number of threads lanuched");
1501                        return -1;
1502                }
1503                stream_data = node->data;
1504
1505                /* Pass the per thread data to the thread */
1506                t->format_data = stream_data;
1507
1508                /* Attach and start the DAG stream */
1509                printf("t%u: starting and attaching stream #%u\n",
1510                       t->perpkt_num, stream_data->dagstream);
1511                if (dag_start_input_stream(libtrace, stream_data) < 0)
1512                        return -1;
1513        }
1514
1515        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
1516
1517        return 0;
1518}
1519
1520static struct libtrace_format_t dag = {
1521        "dag",
1522        "$Id$",
1523        TRACE_FORMAT_ERF,
1524        dag_probe_filename,             /* probe filename */
1525        NULL,                           /* probe magic */
1526        dag_init_input,                 /* init_input */
1527        dag_config_input,               /* config_input */
1528        dag_start_input,                /* start_input */
1529        dag_pause_input,                /* pause_input */
1530        dag_init_output,                /* init_output */
1531        NULL,                           /* config_output */
1532        dag_start_output,               /* start_output */
1533        dag_fin_input,                  /* fin_input */
1534        dag_fin_output,                 /* fin_output */
1535        dag_read_packet,                /* read_packet */
1536        dag_prepare_packet,             /* prepare_packet */
1537        NULL,                           /* fin_packet */
1538        dag_write_packet,               /* write_packet */
1539        erf_get_link_type,              /* get_link_type */
1540        erf_get_direction,              /* get_direction */
1541        erf_set_direction,              /* set_direction */
1542        erf_get_erf_timestamp,          /* get_erf_timestamp */
1543        NULL,                           /* get_timeval */
1544        NULL,                           /* get_seconds */
1545        NULL,                           /* get_timespec */
1546        NULL,                           /* seek_erf */
1547        NULL,                           /* seek_timeval */
1548        NULL,                           /* seek_seconds */
1549        erf_get_capture_length,         /* get_capture_length */
1550        erf_get_wire_length,            /* get_wire_length */
1551        erf_get_framing_length,         /* get_framing_length */
1552        erf_set_capture_length,         /* set_capture_length */
1553        NULL,                           /* get_received_packets */
1554        NULL,                           /* get_filtered_packets */
1555        NULL,                           /* get_dropped_packets */
1556        dag_get_statistics,             /* get_statistics */
1557        NULL,                           /* get_fd */
1558        trace_event_dag,                /* trace_event */
1559        dag_help,                       /* help */
1560        NULL,                            /* next pointer */
1561        {true, 0}, /* live packet capture, thread limit TBD */
1562        dag_pstart_input,
1563        dag_pread_packets,
1564        dag_pause_input,
1565        NULL,
1566        dag_pregister_thread,
1567        NULL,
1568        dag_get_thread_statisitics      /* get thread stats */
1569};
1570
1571void dag_constructor(void)
1572{
1573        register_format(&dag);
1574}
Note: See TracBrowser for help on using the repository browser.