source: lib/format_dag25.c @ 526d9d0

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 526d9d0 was 526d9d0, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Move the accepted packet count to dispatching packets for threads.
Accounting for the doubled count when using a single threaded format.

Rework statistics logic slightly to remove duplicated code.

  • Property mode set to 100644
File size: 45.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81#define DATA(x) ((struct dag_format_data_t *)x->format_data)
82#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89
90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
92
93static struct libtrace_format_t dag;
94
95/* A DAG device - a DAG device can support multiple streams (and therefore
96 * multiple input traces) so each trace needs to refer to a device */
97struct dag_dev_t {
98        char * dev_name;                /* Device name */
99        int fd;                         /* File descriptor */
100        uint16_t ref_count;             /* Number of input / output traces
101                                           that are using this device */
102        struct dag_dev_t *prev;         /* Pointer to the previous device in
103                                           the device list */
104        struct dag_dev_t *next;         /* Pointer to the next device in the
105                                           device list */
106};
107
108/* "Global" data that is stored for each DAG output trace */
109struct dag_format_data_out_t {
110        /* The DAG device being used for writing */
111        struct dag_dev_t *device;
112        /* The DAG stream that is being written on */
113        unsigned int dagstream;
114        /* Boolean flag indicating whether the stream is currently attached */
115        int stream_attached;
116        /* The amount of data waiting to be transmitted, in bytes */
117        uint64_t waiting;
118        /* A buffer to hold the data to be transmittted */
119        uint8_t *txbuffer;
120};
121
122/* Data that is stored against each input stream */
123struct dag_per_stream_t {
124        /* DAG stream number */
125        uint16_t dagstream;
126        /* Pointer to the last unread byte in the DAG memory */
127        uint8_t *top;
128        /* Pointer to the first unread byte in the DAG memory */
129        uint8_t *bottom;
130        /* Amount of data processed from the bottom pointer */
131        uint32_t processed;
132        /* Number of packets seen by the stream */
133        uint64_t pkt_count;
134        /* Drop count for this particular stream */
135        uint64_t drops;
136        /* Boolean values to indicate if a particular interface has been seen
137         * or not. This is limited to four interfaces, which is enough to
138         * support all current DAG cards */
139        uint8_t seeninterface[4];
140};
141
142/* "Global" data that is stored for each DAG input trace */
143struct dag_format_data_t {
144        /* DAG device */
145        struct dag_dev_t *device;
146        /* Boolean flag indicating whether the trace is currently attached */
147        int stream_attached;
148        /* Data stored against each DAG input stream */
149        libtrace_list_t *per_stream;
150
151        /* Data required for regular DUCK reporting.
152         * We put this on a new cache line otherwise we have a lot of false
153         * sharing caused by updating the last_pkt.
154         * This should only ever be accessed by the first thread stream,
155         * that includes both read and write operations.
156         */
157        struct {
158                /* Timestamp of the last DUCK report */
159                uint32_t last_duck;
160                /* The number of seconds between each DUCK report */
161                uint32_t duck_freq;
162                /* Timestamp of the last packet read from the DAG card */
163                uint32_t last_pkt;
164                /* Dummy trace to ensure DUCK packets are dealt with using the
165                 * DUCK format functions */
166                libtrace_t *dummy_duck;
167        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
168};
169
170/* To be thread-safe, we're going to need a mutex for operating on the list
171 * of DAG devices */
172pthread_mutex_t open_dag_mutex;
173
174/* The list of DAG devices that have been opened by libtrace.
175 *
176 * We can only open each DAG device once, but we might want to read from
177 * multiple streams. Therefore, we need to maintain a list of devices that we
178 * have opened (with ref counts!) so that we don't try to open a device too
179 * many times or close a device that we're still using */
180struct dag_dev_t *open_dags = NULL;
181
182/* Returns the amount of padding between the ERF header and the start of the
183 * captured packet data */
184static int dag_get_padding(const libtrace_packet_t *packet)
185{
186        /* ERF Ethernet records have a 2 byte padding before the packet itself
187         * so that the IP header is aligned on a 32 bit boundary.
188         */
189        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
190                dag_record_t *erfptr = (dag_record_t *)packet->header;
191                switch(erfptr->type) {
192                        case TYPE_ETH:
193                        case TYPE_DSM_COLOR_ETH:
194                                return 2;
195                        default:                return 0;
196                }
197        }
198        else {
199                switch(trace_get_link_type(packet)) {
200                        case TRACE_TYPE_ETH:    return 2;
201                        default:                return 0;
202                }
203        }
204}
205
206/* Attempts to determine if the given filename refers to a DAG device */
207static int dag_probe_filename(const char *filename)
208{
209        struct stat statbuf;
210        /* Can we stat the file? */
211        if (stat(filename, &statbuf) != 0) {
212                return 0;
213        }
214        /* Is it a character device? */
215        if (!S_ISCHR(statbuf.st_mode)) {
216                return 0;
217        }
218        /* Yeah, it's probably us. */
219        return 1;
220}
221
222/* Initialises the DAG output data structure */
223static void dag_init_format_out_data(libtrace_out_t *libtrace)
224{
225        libtrace->format_data = (struct dag_format_data_out_t *)
226                malloc(sizeof(struct dag_format_data_out_t));
227        // no DUCK on output
228        FORMAT_DATA_OUT->stream_attached = 0;
229        FORMAT_DATA_OUT->device = NULL;
230        FORMAT_DATA_OUT->dagstream = 0;
231        FORMAT_DATA_OUT->waiting = 0;
232
233}
234
235/* Initialises the DAG input data structure */
236static void dag_init_format_data(libtrace_t *libtrace)
237{
238        struct dag_per_stream_t stream_data;
239
240        libtrace->format_data = (struct dag_format_data_t *)
241                malloc(sizeof(struct dag_format_data_t));
242        DUCK.last_duck = 0;
243        DUCK.duck_freq = 0;
244        DUCK.last_pkt = 0;
245        DUCK.dummy_duck = NULL;
246
247        FORMAT_DATA->per_stream =
248                libtrace_list_init(sizeof(stream_data));
249        assert(FORMAT_DATA->per_stream != NULL);
250
251        /* We'll start with just one instance of stream_data, and we'll
252         * add more later if we need them */
253        memset(&stream_data, 0, sizeof(stream_data));
254        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
255}
256
257/* Determines if there is already an entry for the given DAG device in the
258 * device list and increments the reference count for that device, if found.
259 *
260 * NOTE: This function assumes the open_dag_mutex is held by the caller */
261static struct dag_dev_t *dag_find_open_device(char *dev_name)
262{
263        struct dag_dev_t *dag_dev;
264
265        dag_dev = open_dags;
266
267        /* XXX: Not exactly zippy, but how often are we going to be dealing
268         * with multiple dag cards? */
269        while (dag_dev != NULL) {
270                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
271                        dag_dev->ref_count ++;
272                        return dag_dev;
273                }
274                dag_dev = dag_dev->next;
275        }
276        return NULL;
277}
278
279/* Closes a DAG device and removes it from the device list.
280 *
281 * Attempting to close a DAG device that has a non-zero reference count will
282 * cause an assertion failure!
283 *
284 * NOTE: This function assumes the open_dag_mutex is held by the caller */
285static void dag_close_device(struct dag_dev_t *dev)
286{
287        /* Need to remove from the device list */
288        assert(dev->ref_count == 0);
289
290        if (dev->prev == NULL) {
291                open_dags = dev->next;
292                if (dev->next)
293                        dev->next->prev = NULL;
294        } else {
295                dev->prev->next = dev->next;
296                if (dev->next)
297                        dev->next->prev = dev->prev;
298        }
299
300        dag_close(dev->fd);
301        if (dev->dev_name)
302                free(dev->dev_name);
303        free(dev);
304}
305
306
307/* Opens a new DAG device for writing and adds it to the DAG device list
308 *
309 * NOTE: this function should only be called when opening a DAG device for
310 * writing - there is little practical difference between this and the
311 * function below that covers the reading case, but we need the output trace
312 * object to report errors properly so the two functions take slightly
313 * different arguments. This is really lame and there should be a much better
314 * way of doing this.
315 *
316 * NOTE: This function assumes the open_dag_mutex is held by the caller
317 */
318static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
319                                                char *dev_name)
320{
321        struct stat buf;
322        int fd;
323        struct dag_dev_t *new_dev;
324
325        /* Make sure the device exists */
326        if (stat(dev_name, &buf) == -1) {
327                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
328                return NULL;
329        }
330
331        /* Make sure it is the appropriate type of device */
332        if (S_ISCHR(buf.st_mode)) {
333                /* Try opening the DAG device */
334                if((fd = dag_open(dev_name)) < 0) {
335                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
336                                        dev_name);
337                        return NULL;
338                }
339        } else {
340                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
341                                dev_name);
342                return NULL;
343        }
344
345        /* Add the device to our device list - it is just a doubly linked
346         * list with no inherent ordering; just tack the new one on the front
347         */
348        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
349        new_dev->fd = fd;
350        new_dev->dev_name = dev_name;
351        new_dev->ref_count = 1;
352
353        new_dev->prev = NULL;
354        new_dev->next = open_dags;
355        if (open_dags)
356                open_dags->prev = new_dev;
357
358        open_dags = new_dev;
359
360        return new_dev;
361}
362
363/* Opens a new DAG device for reading and adds it to the DAG device list
364 *
365 * NOTE: this function should only be called when opening a DAG device for
366 * reading - there is little practical difference between this and the
367 * function above that covers the writing case, but we need the input trace
368 * object to report errors properly so the two functions take slightly
369 * different arguments. This is really lame and there should be a much better
370 * way of doing this.
371 *
372 * NOTE: This function assumes the open_dag_mutex is held by the caller */
373static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
374        struct stat buf;
375        int fd;
376        struct dag_dev_t *new_dev;
377
378        /* Make sure the device exists */
379        if (stat(dev_name, &buf) == -1) {
380                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
381                return NULL;
382        }
383
384        /* Make sure it is the appropriate type of device */
385        if (S_ISCHR(buf.st_mode)) {
386                /* Try opening the DAG device */
387                if((fd = dag_open(dev_name)) < 0) {
388                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
389                                      dev_name);
390                        return NULL;
391                }
392        } else {
393                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
394                              dev_name);
395                return NULL;
396        }
397
398        /* Add the device to our device list - it is just a doubly linked
399         * list with no inherent ordering; just tack the new one on the front
400         */
401        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
402        new_dev->fd = fd;
403        new_dev->dev_name = dev_name;
404        new_dev->ref_count = 1;
405
406        new_dev->prev = NULL;
407        new_dev->next = open_dags;
408        if (open_dags)
409                open_dags->prev = new_dev;
410
411        open_dags = new_dev;
412
413        return new_dev;
414}
415
416/* Creates and initialises a DAG output trace */
417static int dag_init_output(libtrace_out_t *libtrace)
418{
419        /* Upon successful creation, the device name is stored against the
420         * device and free when it is free()d */
421        char *dag_dev_name = NULL;
422        char *scan = NULL;
423        struct dag_dev_t *dag_device = NULL;
424        int stream = 1;
425
426        /* XXX I don't know if this is important or not, but this function
427         * isn't present in all of the driver releases that this code is
428         * supposed to support! */
429        /*
430        unsigned long wake_time;
431        dagutil_sleep_get_wake_time(&wake_time,0);
432        */
433
434        dag_init_format_out_data(libtrace);
435        /* Grab the mutex while we're likely to be messing with the device
436         * list */
437        pthread_mutex_lock(&open_dag_mutex);
438
439        /* Specific streams are signified using a comma in the libtrace URI,
440         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
441         *
442         * If no stream is specified, we will write using stream 1 */
443        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
444                dag_dev_name = strdup(libtrace->uridata);
445        } else {
446                dag_dev_name = (char *)strndup(libtrace->uridata,
447                                (size_t)(scan - libtrace->uridata));
448                stream = atoi(++scan);
449        }
450        FORMAT_DATA_OUT->dagstream = stream;
451
452        /* See if our DAG device is already open */
453        dag_device = dag_find_open_device(dag_dev_name);
454
455        if (dag_device == NULL) {
456                /* Device not yet opened - open it ourselves */
457                dag_device = dag_open_output_device(libtrace, dag_dev_name);
458        } else {
459                /* Otherwise, just use the existing one */
460                free(dag_dev_name);
461                dag_dev_name = NULL;
462        }
463
464        /* Make sure we have successfully opened a DAG device */
465        if (dag_device == NULL) {
466                if (dag_dev_name) {
467                        free(dag_dev_name);
468                }
469                pthread_mutex_unlock(&open_dag_mutex);
470                return -1;
471        }
472
473        FORMAT_DATA_OUT->device = dag_device;
474        pthread_mutex_unlock(&open_dag_mutex);
475        return 0;
476}
477
478/* Creates and initialises a DAG input trace */
479static int dag_init_input(libtrace_t *libtrace) {
480        /* Upon successful creation, the device name is stored against the
481         * device and free when it is free()d */
482        char *dag_dev_name = NULL;
483        char *scan = NULL;
484        int stream = 0;
485        struct dag_dev_t *dag_device = NULL;
486
487        dag_init_format_data(libtrace);
488        /* Grab the mutex while we're likely to be messing with the device
489         * list */
490        pthread_mutex_lock(&open_dag_mutex);
491
492
493        /* DAG cards support multiple streams. In a single threaded capture,
494         * these are specified using a comma in the libtrace URI,
495         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
496         *
497         * If no stream is specified, we will read from stream 0 with
498         * one thread
499         */
500        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
501                dag_dev_name = strdup(libtrace->uridata);
502        } else {
503                dag_dev_name = (char *)strndup(libtrace->uridata,
504                                (size_t)(scan - libtrace->uridata));
505                stream = atoi(++scan);
506        }
507
508        FORMAT_DATA_FIRST->dagstream = stream;
509
510        /* See if our DAG device is already open */
511        dag_device = dag_find_open_device(dag_dev_name);
512
513        if (dag_device == NULL) {
514                /* Device not yet opened - open it ourselves */
515                dag_device = dag_open_device(libtrace, dag_dev_name);
516        } else {
517                /* Otherwise, just use the existing one */
518                free(dag_dev_name);
519                dag_dev_name = NULL;
520        }
521
522        /* Make sure we have successfully opened a DAG device */
523        if (dag_device == NULL) {
524                if (dag_dev_name)
525                        free(dag_dev_name);
526                dag_dev_name = NULL;
527                pthread_mutex_unlock(&open_dag_mutex);
528                return -1;
529        }
530
531        FORMAT_DATA->device = dag_device;
532
533        /* See Config_Status_API_Programming_Guide.pdf from the Endace
534           Dag Documentation */
535        /* Check kBooleanAttributeActive is true -- no point capturing
536         * on an interface that's disabled
537         *
538         * The symptom of the port being disabled is that libtrace
539         * will appear to hang. */
540        /* Check kBooleanAttributeFault is false */
541        /* Check kBooleanAttributeLocalFault is false */
542        /* Check kBooleanAttributeLock is true ? */
543        /* Check kBooleanAttributePeerLink ? */
544
545        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
546           on libtrace promisc attribute?*/
547        /* Set kUint32AttributeSnapLength to the snaplength */
548
549        pthread_mutex_unlock(&open_dag_mutex);
550        return 0;
551}
552
553/* Configures a DAG input trace */
554static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
555                            void *data)
556{
557        char conf_str[4096];
558        switch(option) {
559        case TRACE_OPTION_META_FREQ:
560                /* This option is used to specify the frequency of DUCK
561                 * updates */
562                DUCK.duck_freq = *(int *)data;
563                return 0;
564        case TRACE_OPTION_SNAPLEN:
565                /* Tell the card our new snap length */
566                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
567                if (dag_configure(FORMAT_DATA->device->fd,
568                                  conf_str) != 0) {
569                        trace_set_err(libtrace, errno, "Failed to configure "
570                                      "snaplen on DAG card: %s",
571                                      libtrace->uridata);
572                        return -1;
573                }
574                return 0;
575        case TRACE_OPTION_PROMISC:
576                /* DAG already operates in a promisc fashion */
577                return -1;
578        case TRACE_OPTION_FILTER:
579                /* We don't yet support pushing filters into DAG
580                 * cards */
581                return -1;
582        case TRACE_OPTION_EVENT_REALTIME:
583                /* Live capture is always going to be realtime */
584                return -1;
585        }
586        return -1;
587}
588
589/* Starts a DAG output trace */
590static int dag_start_output(libtrace_out_t *libtrace)
591{
592        struct timeval zero, nopoll;
593
594        zero.tv_sec = 0;
595        zero.tv_usec = 0;
596        nopoll = zero;
597
598        /* Attach and start the DAG stream */
599        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
600                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
601                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
602                return -1;
603        }
604
605        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
606                        FORMAT_DATA_OUT->dagstream) < 0) {
607                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
608                return -1;
609        }
610        FORMAT_DATA_OUT->stream_attached = 1;
611
612        /* We don't want the dag card to do any sleeping */
613        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
614                        FORMAT_DATA_OUT->dagstream, 0, &zero,
615                        &nopoll);
616
617        return 0;
618}
619
620static int dag_start_input_stream(libtrace_t *libtrace,
621                                  struct dag_per_stream_t * stream) {
622        struct timeval zero, nopoll;
623        uint8_t *top, *bottom, *starttop;
624        top = bottom = NULL;
625
626        zero.tv_sec = 0;
627        zero.tv_usec = 10000;
628        nopoll = zero;
629
630        /* Attach and start the DAG stream */
631        if (dag_attach_stream(FORMAT_DATA->device->fd,
632                              stream->dagstream, 0, 0) < 0) {
633                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
634                              stream->dagstream);
635                return -1;
636        }
637
638        if (dag_start_stream(FORMAT_DATA->device->fd,
639                             stream->dagstream) < 0) {
640                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
641                              stream->dagstream);
642                return -1;
643        }
644        FORMAT_DATA->stream_attached = 1;
645
646        /* We don't want the dag card to do any sleeping */
647        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
648                            stream->dagstream, 0, &zero,
649                            &nopoll) < 0) {
650                trace_set_err(libtrace, errno,
651                              "dag_set_stream_poll failed!");
652                return -1;
653        }
654
655        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
656                                      stream->dagstream,
657                                      &bottom);
658
659        /* Should probably flush the memory hole now */
660        top = starttop;
661        while (starttop - bottom > 0) {
662                bottom += (starttop - bottom);
663                top = dag_advance_stream(FORMAT_DATA->device->fd,
664                                         stream->dagstream,
665                                         &bottom);
666        }
667        stream->top = top;
668        stream->bottom = bottom;
669        stream->processed = 0;
670        stream->drops = 0;
671
672        return 0;
673
674}
675
676/* Starts a DAG input trace */
677static int dag_start_input(libtrace_t *libtrace)
678{
679        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
680}
681
682static int dag_pstart_input(libtrace_t *libtrace)
683{
684        char *scan, *tok;
685        uint16_t stream_count = 0, max_streams;
686        int iserror = 0;
687        struct dag_per_stream_t stream_data;
688
689        /* Check we aren't trying to create more threads than the DAG card can
690         * handle */
691        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
692        if (libtrace->perpkt_thread_count > max_streams) {
693                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
694                              "trying to create too many threads (max is %u)",
695                              max_streams);
696                iserror = 1;
697                goto cleanup;
698        }
699
700        /* Get the stream names from the uri */
701        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
702                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
703                              "format uri doesn't specify the DAG streams");
704                iserror = 1;
705                goto cleanup;
706        }
707
708        scan++;
709
710        tok = strtok(scan, ",");
711        while (tok != NULL) {
712                /* Ensure we haven't specified too many streams */
713                if (stream_count >= libtrace->perpkt_thread_count) {
714                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
715                                      "format uri specifies too many streams. "
716                                      "Max is %u", max_streams);
717                        iserror = 1;
718                        goto cleanup;
719                }
720
721                /* Save the stream details */
722                if (stream_count == 0) {
723                        /* Special case where we update the existing stream
724                         * data structure */
725                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
726                } else {
727                        memset(&stream_data, 0, sizeof(stream_data));
728                        stream_data.dagstream = (uint16_t)atoi(tok);
729                        libtrace_list_push_back(FORMAT_DATA->per_stream,
730                                                &stream_data);
731                }
732
733                stream_count++;
734                tok = strtok(NULL, ",");
735        }
736
737        FORMAT_DATA->stream_attached = 1;
738
739 cleanup:
740        if (iserror) {
741                return -1;
742        } else {
743                return 0;
744        }
745}
746
747/* Pauses a DAG output trace */
748static int dag_pause_output(libtrace_out_t *libtrace)
749{
750        /* Stop and detach the stream */
751        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
752                            FORMAT_DATA_OUT->dagstream) < 0) {
753                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
754                return -1;
755        }
756        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
757                              FORMAT_DATA_OUT->dagstream) < 0) {
758                trace_set_err_out(libtrace, errno,
759                                  "Could not detach DAG stream");
760                return -1;
761        }
762        FORMAT_DATA_OUT->stream_attached = 0;
763        return 0;
764}
765
766/* Pauses a DAG input trace */
767static int dag_pause_input(libtrace_t *libtrace)
768{
769        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
770
771        /* Stop and detach each stream */
772        while (tmp != NULL) {
773                if (dag_stop_stream(FORMAT_DATA->device->fd,
774                                    STREAM_DATA(tmp)->dagstream) < 0) {
775                        trace_set_err(libtrace, errno,
776                                      "Could not stop DAG stream");
777                        printf("Count not stop DAG stream\n");
778                        return -1;
779                }
780                if (dag_detach_stream(FORMAT_DATA->device->fd,
781                                      STREAM_DATA(tmp)->dagstream) < 0) {
782                        trace_set_err(libtrace, errno,
783                                      "Could not detach DAG stream");
784                        printf("Count not detach DAG stream\n");
785                        return -1;
786                }
787
788                tmp = tmp->next;
789        }
790
791        FORMAT_DATA->stream_attached = 0;
792        return 0;
793}
794
795
796
797/* Closes a DAG input trace */
798static int dag_fin_input(libtrace_t *libtrace)
799{
800        /* Need the lock, since we're going to be handling the device list */
801        pthread_mutex_lock(&open_dag_mutex);
802
803        /* Detach the stream if we are not paused */
804        if (FORMAT_DATA->stream_attached)
805                dag_pause_input(libtrace);
806        FORMAT_DATA->device->ref_count--;
807
808        /* Close the DAG device if there are no more references to it */
809        if (FORMAT_DATA->device->ref_count == 0)
810                dag_close_device(FORMAT_DATA->device);
811
812        if (DUCK.dummy_duck)
813                trace_destroy_dead(DUCK.dummy_duck);
814
815        /* Clear the list */
816        libtrace_list_deinit(FORMAT_DATA->per_stream);
817        free(libtrace->format_data);
818        pthread_mutex_unlock(&open_dag_mutex);
819        return 0; /* success */
820}
821
822/* Closes a DAG output trace */
823static int dag_fin_output(libtrace_out_t *libtrace)
824{
825
826        /* Commit any outstanding traffic in the txbuffer */
827        if (FORMAT_DATA_OUT->waiting) {
828                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
829                                           FORMAT_DATA_OUT->dagstream,
830                                           FORMAT_DATA_OUT->waiting );
831        }
832
833        /* Wait until the buffer is nearly clear before exiting the program,
834         * as we will lose packets otherwise */
835        dag_tx_get_stream_space
836                (FORMAT_DATA_OUT->device->fd,
837                 FORMAT_DATA_OUT->dagstream,
838                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
839                                            FORMAT_DATA_OUT->dagstream) - 8);
840
841        /* Need the lock, since we're going to be handling the device list */
842        pthread_mutex_lock(&open_dag_mutex);
843
844        /* Detach the stream if we are not paused */
845        if (FORMAT_DATA_OUT->stream_attached)
846                dag_pause_output(libtrace);
847        FORMAT_DATA_OUT->device->ref_count --;
848
849        /* Close the DAG device if there are no more references to it */
850        if (FORMAT_DATA_OUT->device->ref_count == 0)
851                dag_close_device(FORMAT_DATA_OUT->device);
852        free(libtrace->format_data);
853        pthread_mutex_unlock(&open_dag_mutex);
854        return 0; /* success */
855}
856
857#ifdef DAGIOC_CARD_DUCK
858#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
859#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
860#else
861#ifdef DAGIOCDUCK
862#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
863#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
864#else
865#warning "DAG appears to be missing DUCK support"
866#endif
867#endif
868
869/* Extracts DUCK information from the DAG card and produces a DUCK packet */
870static int dag_get_duckinfo(libtrace_t *libtrace,
871                                libtrace_packet_t *packet) {
872
873        if (DUCK.duck_freq == 0)
874                return 0;
875
876#ifndef LIBTRACE_DUCK_IOCTL
877        trace_set_err(libtrace, errno, 
878                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
879        DUCK.duck_freq = 0;
880        return -1;
881#endif
882
883        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
884                return 0;
885
886        /* Allocate memory for the DUCK data */
887        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
888            !packet->buffer) {
889                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
890                packet->buf_control = TRACE_CTRL_PACKET;
891                if (!packet->buffer) {
892                        trace_set_err(libtrace, errno,
893                                      "Cannot allocate packet buffer");
894                        return -1;
895                }
896        }
897
898        /* DUCK doesn't have a format header */
899        packet->header = 0;
900        packet->payload = packet->buffer;
901
902        /* No need to check if we can get DUCK or not - we're modern
903         * enough so just grab the DUCK info */
904        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
905                   (duckinf_t *)packet->payload) < 0)) {
906                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
907                DUCK.duck_freq = 0;
908                return -1;
909        }
910
911        packet->type = LIBTRACE_DUCK_VERSION;
912
913        /* Set the packet's trace to point at a DUCK trace, so that the
914         * DUCK format functions will be called on the packet rather than the
915         * DAG ones */
916        if (!DUCK.dummy_duck)
917                DUCK.dummy_duck = trace_create_dead("duck:dummy");
918        packet->trace = DUCK.dummy_duck;
919        DUCK.last_duck = DUCK.last_pkt;
920        packet->error = sizeof(duckinf_t);
921        return sizeof(duckinf_t);
922}
923
924/* Determines the amount of data available to read from the DAG card */
925static int dag_available(libtrace_t *libtrace,
926                         struct dag_per_stream_t *stream_data)
927{
928        uint32_t diff = stream_data->top - stream_data->bottom;
929
930        /* If we've processed more than 4MB of data since we last called
931         * dag_advance_stream, then we should call it again to allow the
932         * space occupied by that 4MB to be released */
933        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
934                return diff;
935
936        /* Update the top and bottom pointers */
937        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
938                                              stream_data->dagstream,
939                                              &(stream_data->bottom));
940
941        if (stream_data->top == NULL) {
942                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
943                return -1;
944        }
945        stream_data->processed = 0;
946        diff = stream_data->top - stream_data->bottom;
947        return diff;
948}
949
950/* Returns a pointer to the start of the next complete ERF record */
951static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
952{
953        dag_record_t *erfptr = NULL;
954        uint16_t size;
955
956        erfptr = (dag_record_t *)stream_data->bottom;
957        if (!erfptr)
958                return NULL;
959
960        size = ntohs(erfptr->rlen);
961        assert( size >= dag_record_size );
962
963        /* Make certain we have the full packet available */
964        if (size > (stream_data->top - stream_data->bottom))
965                return NULL;
966
967        stream_data->bottom += size;
968        stream_data->processed += size;
969        return erfptr;
970}
971
972/* Converts a buffer containing a recently read DAG packet record into a
973 * libtrace packet */
974static int dag_prepare_packet_stream(libtrace_t *libtrace,
975                                     struct dag_per_stream_t *stream_data,
976                                     libtrace_packet_t *packet,
977                                     void *buffer, libtrace_rt_types_t rt_type,
978                                     uint32_t flags)
979{
980        dag_record_t *erfptr;
981
982        /* If the packet previously owned a buffer that is not the buffer
983         * that contains the new packet data, we're going to need to free the
984         * old one to avoid memory leaks */
985        if (packet->buffer != buffer &&
986            packet->buf_control == TRACE_CTRL_PACKET) {
987                free(packet->buffer);
988        }
989
990        /* Set the buffer owner appropriately */
991        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
992                packet->buf_control = TRACE_CTRL_PACKET;
993        } else
994                packet->buf_control = TRACE_CTRL_EXTERNAL;
995
996        /* Update the packet pointers and type appropriately */
997        erfptr = (dag_record_t *)buffer;
998        packet->buffer = erfptr;
999        packet->header = erfptr;
1000        packet->type = rt_type;
1001
1002        if (erfptr->flags.rxerror == 1) {
1003                /* rxerror means the payload is corrupt - drop the payload
1004                 * by tweaking rlen */
1005                packet->payload = NULL;
1006                erfptr->rlen = htons(erf_get_framing_length(packet));
1007        } else {
1008                packet->payload = (char*)packet->buffer
1009                        + erf_get_framing_length(packet);
1010        }
1011
1012        if (libtrace->format_data == NULL) {
1013                dag_init_format_data(libtrace);
1014        }
1015
1016        /* Update the dropped packets counter */
1017        /* No loss counter for DSM coloured records - have to use some
1018         * other API */
1019        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
1020                /* TODO */
1021        } else {
1022                /* Use the ERF loss counter */
1023                if (stream_data->seeninterface[erfptr->flags.iface]
1024                    == 0) {
1025                        stream_data->seeninterface[erfptr->flags.iface]
1026                                = 1;
1027                } else {
1028                        stream_data->drops += ntohs(erfptr->lctr);
1029                }
1030        }
1031
1032        return 0;
1033}
1034
1035static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1036                              void *buffer, libtrace_rt_types_t rt_type,
1037                              uint32_t flags)
1038{
1039        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1040                                       buffer, rt_type, flags);
1041}
1042
1043/*
1044 * dag_write_packet() at this stage attempts to improve tx performance
1045 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1046 * has been met. I observed approximately 270% performance increase
1047 * through this relatively naive tweak. No optimisation of buffer sizes
1048 * was attempted.
1049 */
1050
1051/* Pushes an ERF record onto the transmit stream */
1052static int dag_dump_packet(libtrace_out_t *libtrace,
1053                           dag_record_t *erfptr, unsigned int pad,
1054                           void *buffer)
1055{
1056        int size;
1057
1058        /*
1059         * If we've got 0 bytes waiting in the txqueue, assume that we
1060         * haven't requested any space yet, and request some, storing
1061         * the pointer at FORMAT_DATA_OUT->txbuffer.
1062         *
1063         * The amount to request is slightly magical at the moment - it's
1064         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1065         * the buffer and handle overruns.
1066         */
1067        if (FORMAT_DATA_OUT->waiting == 0) {
1068                FORMAT_DATA_OUT->txbuffer =
1069                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1070                                                FORMAT_DATA_OUT->dagstream,
1071                                                16908288);
1072        }
1073
1074        /*
1075         * Copy the header separately to the body, as we can't guarantee they
1076         * are in contiguous memory
1077         */
1078        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1079               (dag_record_size + pad));
1080        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1081
1082        /*
1083         * Copy our incoming packet into the outgoing buffer, and increment
1084         * our waiting count
1085         */
1086        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1087        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1088               size);
1089        FORMAT_DATA_OUT->waiting += size;
1090
1091        /*
1092         * If our output buffer has more than 16 Mebibytes in it, commit those
1093         * bytes and reset the waiting count to 0.
1094         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1095         * case there is still data in the buffer at program exit.
1096         */
1097        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1098                FORMAT_DATA_OUT->txbuffer =
1099                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1100                                                   FORMAT_DATA_OUT->dagstream,
1101                                                   FORMAT_DATA_OUT->waiting);
1102                FORMAT_DATA_OUT->waiting = 0;
1103        }
1104
1105        return size + pad + dag_record_size;
1106}
1107
1108/* Attempts to determine a suitable ERF type for a given packet. Returns true
1109 * if one is found, false otherwise */
1110static bool find_compatible_linktype(libtrace_out_t *libtrace,
1111                                     libtrace_packet_t *packet, char *type)
1112{
1113        /* Keep trying to simplify the packet until we can find
1114         * something we can do with it */
1115
1116        do {
1117                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1118
1119                /* Success */
1120                if (*type != (char)-1)
1121                        return true;
1122
1123                if (!demote_packet(packet)) {
1124                        trace_set_err_out(libtrace,
1125                                          TRACE_ERR_NO_CONVERSION,
1126                                          "No erf type for packet (%i)",
1127                                          trace_get_link_type(packet));
1128                        return false;
1129                }
1130
1131        } while(1);
1132
1133        return true;
1134}
1135
1136/* Writes a packet to the provided DAG output trace */
1137static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1138{
1139        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1140         * coding sucks, sorry about that.
1141         */
1142        unsigned int pad = 0;
1143        int numbytes;
1144        void *payload = packet->payload;
1145        dag_record_t *header = (dag_record_t *)packet->header;
1146        char erf_type = 0;
1147
1148        if(!packet->header) {
1149                /* No header, probably an RT packet. Lifted from
1150                 * erf_write_packet(). */
1151                return -1;
1152        }
1153
1154        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1155                return 0;
1156
1157        pad = dag_get_padding(packet);
1158
1159        /*
1160         * If the payload is null, adjust the rlen. Discussion of this is
1161         * attached to erf_write_packet()
1162         */
1163        if (payload == NULL) {
1164                header->rlen = htons(dag_record_size + pad);
1165        }
1166
1167        if (packet->type == TRACE_RT_DATA_ERF) {
1168                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1169        } else {
1170                /* Build up a new packet header from the existing header */
1171
1172                /* Simplify the packet first - if we can't do this, break
1173                 * early */
1174                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1175                        return -1;
1176
1177                dag_record_t erfhdr;
1178
1179                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1180                payload=packet->payload;
1181                pad = dag_get_padding(packet);
1182
1183                /* Flags. Can't do this */
1184                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1185                if (trace_get_direction(packet)!=(int)~0U)
1186                        erfhdr.flags.iface = trace_get_direction(packet);
1187
1188                erfhdr.type = erf_type;
1189
1190                /* Packet length (rlen includes format overhead) */
1191                assert(trace_get_capture_length(packet) > 0
1192                       && trace_get_capture_length(packet) <= 65536);
1193                assert(erf_get_framing_length(packet) > 0
1194                       && trace_get_framing_length(packet) <= 65536);
1195                assert(trace_get_capture_length(packet) +
1196                       erf_get_framing_length(packet) > 0
1197                       && trace_get_capture_length(packet) +
1198                       erf_get_framing_length(packet) <= 65536);
1199
1200                erfhdr.rlen = htons(trace_get_capture_length(packet)
1201                                    + erf_get_framing_length(packet));
1202
1203
1204                /* Loss counter. Can't do this */
1205                erfhdr.lctr = 0;
1206                /* Wire length, does not include padding! */
1207                erfhdr.wlen = htons(trace_get_wire_length(packet));
1208
1209                /* Write it out */
1210                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1211        }
1212
1213        return numbytes;
1214}
1215
1216/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1217 *
1218 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1219 */
1220static int dag_read_packet_stream(libtrace_t *libtrace,
1221                                struct dag_per_stream_t *stream_data,
1222                                libtrace_thread_t *t, /* Optional */
1223                                libtrace_packet_t *packet)
1224{
1225        int size = 0;
1226        dag_record_t *erfptr = NULL;
1227        struct timeval tv;
1228        int numbytes = 0;
1229        uint32_t flags = 0;
1230        struct timeval maxwait, pollwait;
1231
1232        pollwait.tv_sec = 0;
1233        pollwait.tv_usec = 10000;
1234        maxwait.tv_sec = 0;
1235        maxwait.tv_usec = 250000;
1236
1237        /* Check if we're due for a DUCK report - only report on the first thread */
1238        if (stream_data == FORMAT_DATA_FIRST) {
1239                size = dag_get_duckinfo(libtrace, packet);
1240                if (size != 0)
1241                        return size;
1242        }
1243
1244
1245        /* Don't let anyone try to free our DAG memory hole! */
1246        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1247
1248        /* If the packet buffer is currently owned by libtrace, free it so
1249         * that we can set the packet to point into the DAG memory hole */
1250        if (packet->buf_control == TRACE_CTRL_PACKET) {
1251                free(packet->buffer);
1252                packet->buffer = 0;
1253        }
1254
1255        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
1256                                sizeof(dag_record_t), &maxwait,
1257                                &pollwait) == -1) {
1258                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1259                return -1;
1260        }
1261
1262        /* Grab a full ERF record */
1263        do {
1264                numbytes = dag_available(libtrace, stream_data);
1265                if (numbytes < 0)
1266                        return numbytes;
1267                if (numbytes < dag_record_size) {
1268                        /* Check the message queue if we have one to check */
1269                        if (t != NULL &&
1270                            libtrace_message_queue_count(&t->messages) > 0)
1271                                return -2;
1272
1273                        if (libtrace_halt)
1274                                return 0;
1275                        /* Block until we see a packet */
1276                        continue;
1277                }
1278                erfptr = dag_get_record(stream_data);
1279        } while (erfptr == NULL);
1280
1281        packet->trace = libtrace;
1282        /* Prepare the libtrace packet */
1283        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1284                                    TRACE_RT_DATA_ERF, flags))
1285                return -1;
1286
1287        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1288        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1289                tv = trace_get_timeval(packet);
1290                DUCK.last_pkt = tv.tv_sec;
1291        }
1292
1293        packet->error = packet->payload ? htons(erfptr->rlen) :
1294                                          erf_get_framing_length(packet);
1295
1296        return packet->error;
1297}
1298
1299static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1300{
1301        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1302}
1303
1304static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1305                             libtrace_packet_t **packets, size_t nb_packets)
1306{
1307        int ret;
1308        size_t read_packets = 0;
1309        int numbytes = 0;
1310
1311        struct dag_per_stream_t *stream_data =
1312                (struct dag_per_stream_t *)t->format_data;
1313
1314        /* Read as many packets as we can, but read atleast one packet */
1315        do {
1316                ret = dag_read_packet_stream(libtrace, stream_data, t,
1317                                           packets[read_packets]);
1318                if (ret < 0)
1319                        return ret;
1320
1321                read_packets++;
1322
1323                /* Make sure we don't read too many packets..! */
1324                if (read_packets >= nb_packets)
1325                        break;
1326
1327                numbytes = dag_available(libtrace, stream_data);
1328        } while (numbytes >= dag_record_size);
1329
1330        return read_packets;
1331}
1332
1333/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1334 * packet is available, we will return a packet event. Otherwise we will
1335 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1336 */
1337static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1338                                           libtrace_packet_t *packet)
1339{
1340        libtrace_eventobj_t event = {0,0,0.0,0};
1341        dag_record_t *erfptr = NULL;
1342        int numbytes;
1343        uint32_t flags = 0;
1344        struct timeval minwait, tv;
1345       
1346        minwait.tv_sec = 0;
1347        minwait.tv_usec = 10000;
1348
1349        /* Check if we're meant to provide a DUCK update */
1350        numbytes = dag_get_duckinfo(libtrace, packet);
1351        if (numbytes < 0) {
1352                event.type = TRACE_EVENT_TERMINATE;
1353                return event;
1354        } else if (numbytes > 0) {
1355                event.type = TRACE_EVENT_PACKET;
1356                return event;
1357        }
1358       
1359        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
1360                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1361                                &minwait) == -1) {
1362                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1363                event.type = TRACE_EVENT_TERMINATE;
1364                return event;
1365        }
1366
1367        do {
1368                erfptr = NULL;
1369                numbytes = 0;
1370
1371                /* Need to call dag_available so that the top pointer will get
1372                 * updated, otherwise we'll never see any data! */
1373                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1374
1375                /* May as well not bother calling dag_get_record if
1376                 * dag_available suggests that there's no data */
1377                if (numbytes != 0)
1378                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1379                if (erfptr == NULL) {
1380                        /* No packet available - sleep for a very short time */
1381                        if (libtrace_halt) {
1382                                event.type = TRACE_EVENT_TERMINATE;
1383                        } else {
1384                                event.type = TRACE_EVENT_SLEEP;
1385                                event.seconds = 0.0001;
1386                        }
1387                        break;
1388                }
1389                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1390                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1391                        event.type = TRACE_EVENT_TERMINATE;
1392                        break;
1393                }
1394
1395
1396                event.size = trace_get_capture_length(packet) +
1397                        trace_get_framing_length(packet);
1398
1399                /* XXX trace_read_packet() normally applies the following
1400                 * config options for us, but this function is called via
1401                 * trace_event() so we have to do it ourselves */
1402
1403                if (libtrace->filter) {
1404                        int filtret = trace_apply_filter(libtrace->filter,
1405                                                         packet);
1406                        if (filtret == -1) {
1407                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1408                                              "Bad BPF Filter");
1409                                event.type = TRACE_EVENT_TERMINATE;
1410                                break;
1411                        }
1412
1413                        if (filtret == 0) {
1414                                /* This packet isn't useful so we want to
1415                                 * immediately see if there is another suitable
1416                                 * one - we definitely DO NOT want to return
1417                                 * a sleep event in this case, like we used to
1418                                 * do! */
1419                                libtrace->filtered_packets ++;
1420                                trace_clear_cache(packet);
1421                                continue;
1422                        }
1423
1424                        event.type = TRACE_EVENT_PACKET;
1425                } else {
1426                        event.type = TRACE_EVENT_PACKET;
1427                }
1428
1429                /* Update the DUCK timer */
1430                tv = trace_get_timeval(packet);
1431                DUCK.last_pkt = tv.tv_sec;
1432               
1433                if (libtrace->snaplen > 0) {
1434                        trace_set_capture_length(packet, libtrace->snaplen);
1435                }
1436                libtrace->accepted_packets ++;
1437                break;
1438        } while(1);
1439
1440        return event;
1441}
1442
1443static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1444{
1445        libtrace_list_node_t *tmp;
1446        assert(stat && libtrace);
1447        tmp = FORMAT_DATA_HEAD;
1448
1449        /* Dropped packets */
1450        stat->dropped_valid = 1;
1451        stat->dropped = 0;
1452        while (tmp != NULL) {
1453                stat->dropped += STREAM_DATA(tmp)->drops;
1454                tmp = tmp->next;
1455        }
1456
1457}
1458
1459static void dag_get_thread_statisitics(libtrace_t *libtrace, libtrace_thread_t *t,
1460                                       libtrace_stat_t *stat) {
1461        struct dag_per_stream_t *stream_data = t->format_data;
1462        assert(stat && libtrace);
1463
1464        stat->dropped_valid = 1;
1465        stat->dropped = stream_data->drops;
1466
1467        stat->filtered_valid = 1;
1468        stat->filtered = 0;
1469}
1470
1471/* Prints some semi-useful help text about the DAG format module */
1472static void dag_help(void) {
1473        printf("dag format module: $Revision: 1755 $\n");
1474        printf("Supported input URIs:\n");
1475        printf("\tdag:/dev/dagn\n");
1476        printf("\n");
1477        printf("\te.g.: dag:/dev/dag0\n");
1478        printf("\n");
1479        printf("Supported output URIs:\n");
1480        printf("\tnone\n");
1481        printf("\n");
1482}
1483
1484static int dag_pconfig_input(UNUSED libtrace_t *libtrace,
1485                             trace_parallel_option_t option, UNUSED void *value)
1486{
1487        /* We don't support any of these! Normally you configure the DAG card
1488         * externally. */
1489        switch(option) {
1490        case TRACE_OPTION_SET_HASHER:
1491        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1492        case TRACE_OPTION_TRACETIME:
1493        case TRACE_OPTION_TICK_INTERVAL:
1494        case TRACE_OPTION_GET_CONFIG:
1495        case TRACE_OPTION_SET_CONFIG:
1496                return -1;
1497        }
1498        /* We don't provide a default option to ensure that future options will
1499         * generate a compiler warning. */
1500
1501        return -1;
1502}
1503
1504static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1505                                bool reader)
1506{
1507        struct dag_per_stream_t *stream_data;
1508        libtrace_list_node_t *node;
1509
1510        if (reader && t->type == THREAD_PERPKT) {
1511                fprintf(stderr, "t%u: registered reader thread", t->perpkt_num);
1512                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1513                                                t->perpkt_num);
1514                if (node == NULL) {
1515                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1516                                      "Too few streams supplied for the"
1517                                      " number of threads lanuched");
1518                        return -1;
1519                }
1520                stream_data = node->data;
1521
1522                /* Pass the per thread data to the thread */
1523                t->format_data = stream_data;
1524
1525                /* Attach and start the DAG stream */
1526                printf("t%u: starting and attaching stream #%u\n",
1527                       t->perpkt_num, stream_data->dagstream);
1528                if (dag_start_input_stream(libtrace, stream_data) < 0)
1529                        return -1;
1530        }
1531
1532        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
1533
1534        return 0;
1535}
1536
1537static struct libtrace_format_t dag = {
1538        "dag",
1539        "$Id$",
1540        TRACE_FORMAT_ERF,
1541        dag_probe_filename,             /* probe filename */
1542        NULL,                           /* probe magic */
1543        dag_init_input,                 /* init_input */
1544        dag_config_input,               /* config_input */
1545        dag_start_input,                /* start_input */
1546        dag_pause_input,                /* pause_input */
1547        dag_init_output,                /* init_output */
1548        NULL,                           /* config_output */
1549        dag_start_output,               /* start_output */
1550        dag_fin_input,                  /* fin_input */
1551        dag_fin_output,                 /* fin_output */
1552        dag_read_packet,                /* read_packet */
1553        dag_prepare_packet,             /* prepare_packet */
1554        NULL,                           /* fin_packet */
1555        dag_write_packet,               /* write_packet */
1556        erf_get_link_type,              /* get_link_type */
1557        erf_get_direction,              /* get_direction */
1558        erf_set_direction,              /* set_direction */
1559        erf_get_erf_timestamp,          /* get_erf_timestamp */
1560        NULL,                           /* get_timeval */
1561        NULL,                           /* get_seconds */
1562        NULL,                           /* get_timespec */
1563        NULL,                           /* seek_erf */
1564        NULL,                           /* seek_timeval */
1565        NULL,                           /* seek_seconds */
1566        erf_get_capture_length,         /* get_capture_length */
1567        erf_get_wire_length,            /* get_wire_length */
1568        erf_get_framing_length,         /* get_framing_length */
1569        erf_set_capture_length,         /* set_capture_length */
1570        NULL,                           /* get_received_packets */
1571        NULL,                           /* get_filtered_packets */
1572        NULL,                           /* get_dropped_packets */
1573        dag_get_statistics,             /* get_statistics */
1574        NULL,                           /* get_fd */
1575        trace_event_dag,                /* trace_event */
1576        dag_help,                       /* help */
1577        NULL,                            /* next pointer */
1578        {true, 0}, /* live packet capture, thread limit TBD */
1579        dag_pstart_input,
1580        dag_pread_packets,
1581        dag_pause_input,
1582        NULL,
1583        dag_pconfig_input,
1584        dag_pregister_thread,
1585        NULL,
1586        dag_get_thread_statisitics      /* get thread stats */
1587};
1588
1589void dag_constructor(void)
1590{
1591        register_format(&dag);
1592}
Note: See TracBrowser for help on using the repository browser.