source: lib/format_dag25.c @ 5e3f16c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5e3f16c was 5e3f16c, checked in by Richard Sanger <rsanger@…>, 4 years ago

Fix for issue #39 - ring and int pstop() fails on older kernels when using threads

The problem here is that on old kernels without PACKET_FANOUT support
(added in v3.1) will only include the single threaded versions of int
and ring. When used with multiple threads the libtrace API will
fallback to using read rather than pread which does not check message
queues.

To fix this issue, in any format without pread support:

  • We check for new messages with each loop around read_packet as we fill the burst
  • Within read_packet we update the halt to include the pausing state
  • Use a seperate lock to the main lock when reading a burst of packets, otherwise trace_ppause has to wait for a burst to read.

This is not 100% perfect as a single packet might still need to be received
before a generic message can be received.
A proper fix in the future would be to move all format internals purely to the
parallel API.

  • Property mode set to 100644
File size: 45.2 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#define _GNU_SOURCE
27
28#include "config.h"
29#include "common.h"
30#include "libtrace.h"
31#include "libtrace_int.h"
32#include "format_helper.h"
33#include "format_erf.h"
34
35#include <assert.h>
36#include <errno.h>
37#include <fcntl.h>
38#include <stdio.h>
39#include <string.h>
40#include <stdlib.h>
41#include <sys/stat.h>
42
43#include <sys/mman.h>
44/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
45 * Windows anyway so we'll worry about this more later :] */
46#include <pthread.h>
47
48
49#ifdef HAVE_DAG_CONFIG_API_H
50#include <dag_config_api.h>
51#endif
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66/* This format deals with DAG cards that are using drivers from the 2.5 version
67 * onwards, including 3.X.
68 *
69 * DAG is a LIVE capture format.
70 *
71 * This format does support writing, provided the DAG card that you are using
72 * has transmit (Tx) support. Additionally, packets read using this format
73 * are in the ERF format, so can easily be written as ERF traces without
74 * losing any data.
75 */
76
77#define DATA(x) ((struct dag_format_data_t *)x->format_data)
78#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
79#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
80
81#define FORMAT_DATA DATA(libtrace)
82#define FORMAT_DATA_OUT DATA_OUT(libtrace)
83
84#define DUCK FORMAT_DATA->duck
85
86#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
87#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
88
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* The DAG device being used for writing */
107        struct dag_dev_t *device;
108        /* The DAG stream that is being written on */
109        unsigned int dagstream;
110        /* Boolean flag indicating whether the stream is currently attached */
111        int stream_attached;
112        /* The amount of data waiting to be transmitted, in bytes */
113        uint64_t waiting;
114        /* A buffer to hold the data to be transmittted */
115        uint8_t *txbuffer;
116};
117
118/* Data that is stored against each input stream */
119struct dag_per_stream_t {
120        /* DAG stream number */
121        uint16_t dagstream;
122        /* Pointer to the last unread byte in the DAG memory */
123        uint8_t *top;
124        /* Pointer to the first unread byte in the DAG memory */
125        uint8_t *bottom;
126        /* Amount of data processed from the bottom pointer */
127        uint32_t processed;
128        /* Number of packets seen by the stream */
129        uint64_t pkt_count;
130        /* Drop count for this particular stream */
131        uint64_t drops;
132        /* Boolean values to indicate if a particular interface has been seen
133         * or not. This is limited to four interfaces, which is enough to
134         * support all current DAG cards */
135        uint8_t seeninterface[4];
136};
137
138/* "Global" data that is stored for each DAG input trace */
139struct dag_format_data_t {
140        /* DAG device */
141        struct dag_dev_t *device;
142        /* Boolean flag indicating whether the trace is currently attached */
143        int stream_attached;
144        /* Data stored against each DAG input stream */
145        libtrace_list_t *per_stream;
146
147        /* Data required for regular DUCK reporting.
148         * We put this on a new cache line otherwise we have a lot of false
149         * sharing caused by updating the last_pkt.
150         * This should only ever be accessed by the first thread stream,
151         * that includes both read and write operations.
152         */
153        struct {
154                /* Timestamp of the last DUCK report */
155                uint32_t last_duck;
156                /* The number of seconds between each DUCK report */
157                uint32_t duck_freq;
158                /* Timestamp of the last packet read from the DAG card */
159                uint32_t last_pkt;
160                /* Dummy trace to ensure DUCK packets are dealt with using the
161                 * DUCK format functions */
162                libtrace_t *dummy_duck;
163        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
164};
165
166/* To be thread-safe, we're going to need a mutex for operating on the list
167 * of DAG devices */
168pthread_mutex_t open_dag_mutex;
169
170/* The list of DAG devices that have been opened by libtrace.
171 *
172 * We can only open each DAG device once, but we might want to read from
173 * multiple streams. Therefore, we need to maintain a list of devices that we
174 * have opened (with ref counts!) so that we don't try to open a device too
175 * many times or close a device that we're still using */
176struct dag_dev_t *open_dags = NULL;
177
178/* Returns the amount of padding between the ERF header and the start of the
179 * captured packet data */
180static int dag_get_padding(const libtrace_packet_t *packet)
181{
182        /* ERF Ethernet records have a 2 byte padding before the packet itself
183         * so that the IP header is aligned on a 32 bit boundary.
184         */
185        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
186                dag_record_t *erfptr = (dag_record_t *)packet->header;
187                switch(erfptr->type) {
188                        case TYPE_ETH:
189                        case TYPE_DSM_COLOR_ETH:
190                                return 2;
191                        default:                return 0;
192                }
193        }
194        else {
195                switch(trace_get_link_type(packet)) {
196                        case TRACE_TYPE_ETH:    return 2;
197                        default:                return 0;
198                }
199        }
200}
201
202/* Attempts to determine if the given filename refers to a DAG device */
203static int dag_probe_filename(const char *filename)
204{
205        struct stat statbuf;
206        /* Can we stat the file? */
207        if (stat(filename, &statbuf) != 0) {
208                return 0;
209        }
210        /* Is it a character device? */
211        if (!S_ISCHR(statbuf.st_mode)) {
212                return 0;
213        }
214        /* Yeah, it's probably us. */
215        return 1;
216}
217
218/* Initialises the DAG output data structure */
219static void dag_init_format_out_data(libtrace_out_t *libtrace)
220{
221        libtrace->format_data = (struct dag_format_data_out_t *)
222                malloc(sizeof(struct dag_format_data_out_t));
223        // no DUCK on output
224        FORMAT_DATA_OUT->stream_attached = 0;
225        FORMAT_DATA_OUT->device = NULL;
226        FORMAT_DATA_OUT->dagstream = 0;
227        FORMAT_DATA_OUT->waiting = 0;
228
229}
230
231/* Initialises the DAG input data structure */
232static void dag_init_format_data(libtrace_t *libtrace)
233{
234        struct dag_per_stream_t stream_data;
235
236        libtrace->format_data = (struct dag_format_data_t *)
237                malloc(sizeof(struct dag_format_data_t));
238        DUCK.last_duck = 0;
239        DUCK.duck_freq = 0;
240        DUCK.last_pkt = 0;
241        DUCK.dummy_duck = NULL;
242
243        FORMAT_DATA->per_stream =
244                libtrace_list_init(sizeof(stream_data));
245        assert(FORMAT_DATA->per_stream != NULL);
246
247        /* We'll start with just one instance of stream_data, and we'll
248         * add more later if we need them */
249        memset(&stream_data, 0, sizeof(stream_data));
250        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
251}
252
253/* Determines if there is already an entry for the given DAG device in the
254 * device list and increments the reference count for that device, if found.
255 *
256 * NOTE: This function assumes the open_dag_mutex is held by the caller */
257static struct dag_dev_t *dag_find_open_device(char *dev_name)
258{
259        struct dag_dev_t *dag_dev;
260
261        dag_dev = open_dags;
262
263        /* XXX: Not exactly zippy, but how often are we going to be dealing
264         * with multiple dag cards? */
265        while (dag_dev != NULL) {
266                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
267                        dag_dev->ref_count ++;
268                        return dag_dev;
269                }
270                dag_dev = dag_dev->next;
271        }
272        return NULL;
273}
274
275/* Closes a DAG device and removes it from the device list.
276 *
277 * Attempting to close a DAG device that has a non-zero reference count will
278 * cause an assertion failure!
279 *
280 * NOTE: This function assumes the open_dag_mutex is held by the caller */
281static void dag_close_device(struct dag_dev_t *dev)
282{
283        /* Need to remove from the device list */
284        assert(dev->ref_count == 0);
285
286        if (dev->prev == NULL) {
287                open_dags = dev->next;
288                if (dev->next)
289                        dev->next->prev = NULL;
290        } else {
291                dev->prev->next = dev->next;
292                if (dev->next)
293                        dev->next->prev = dev->prev;
294        }
295
296        dag_close(dev->fd);
297        free(dev);
298}
299
300
301/* Opens a new DAG device for writing and adds it to the DAG device list
302 *
303 * NOTE: this function should only be called when opening a DAG device for
304 * writing - there is little practical difference between this and the
305 * function below that covers the reading case, but we need the output trace
306 * object to report errors properly so the two functions take slightly
307 * different arguments. This is really lame and there should be a much better
308 * way of doing this.
309 *
310 * NOTE: This function assumes the open_dag_mutex is held by the caller
311 */
312static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
313                                                char *dev_name)
314{
315        struct stat buf;
316        int fd;
317        struct dag_dev_t *new_dev;
318
319        /* Make sure the device exists */
320        if (stat(dev_name, &buf) == -1) {
321                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
322                return NULL;
323        }
324
325        /* Make sure it is the appropriate type of device */
326        if (S_ISCHR(buf.st_mode)) {
327                /* Try opening the DAG device */
328                if((fd = dag_open(dev_name)) < 0) {
329                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
330                                        dev_name);
331                        return NULL;
332                }
333        } else {
334                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
335                                dev_name);
336                return NULL;
337        }
338
339        /* Add the device to our device list - it is just a doubly linked
340         * list with no inherent ordering; just tack the new one on the front
341         */
342        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
343        new_dev->fd = fd;
344        new_dev->dev_name = dev_name;
345        new_dev->ref_count = 1;
346
347        new_dev->prev = NULL;
348        new_dev->next = open_dags;
349        if (open_dags)
350                open_dags->prev = new_dev;
351
352        open_dags = new_dev;
353
354        return new_dev;
355}
356
357/* Opens a new DAG device for reading and adds it to the DAG device list
358 *
359 * NOTE: this function should only be called when opening a DAG device for
360 * reading - there is little practical difference between this and the
361 * function above that covers the writing case, but we need the input trace
362 * object to report errors properly so the two functions take slightly
363 * different arguments. This is really lame and there should be a much better
364 * way of doing this.
365 *
366 * NOTE: This function assumes the open_dag_mutex is held by the caller */
367static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
368        struct stat buf;
369        int fd;
370        struct dag_dev_t *new_dev;
371
372        /* Make sure the device exists */
373        if (stat(dev_name, &buf) == -1) {
374                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
375                return NULL;
376        }
377
378        /* Make sure it is the appropriate type of device */
379        if (S_ISCHR(buf.st_mode)) {
380                /* Try opening the DAG device */
381                if((fd = dag_open(dev_name)) < 0) {
382                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
383                                      dev_name);
384                        return NULL;
385                }
386        } else {
387                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
388                              dev_name);
389                return NULL;
390        }
391
392        /* Add the device to our device list - it is just a doubly linked
393         * list with no inherent ordering; just tack the new one on the front
394         */
395        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
396        new_dev->fd = fd;
397        new_dev->dev_name = dev_name;
398        new_dev->ref_count = 1;
399
400        new_dev->prev = NULL;
401        new_dev->next = open_dags;
402        if (open_dags)
403                open_dags->prev = new_dev;
404
405        open_dags = new_dev;
406
407        return new_dev;
408}
409
410/* Creates and initialises a DAG output trace */
411static int dag_init_output(libtrace_out_t *libtrace)
412{
413        /* Upon successful creation, the device name is stored against the
414         * device and free when it is free()d */
415        char *dag_dev_name = NULL;
416        char *scan = NULL;
417        struct dag_dev_t *dag_device = NULL;
418        int stream = 1;
419
420        /* XXX I don't know if this is important or not, but this function
421         * isn't present in all of the driver releases that this code is
422         * supposed to support! */
423        /*
424        unsigned long wake_time;
425        dagutil_sleep_get_wake_time(&wake_time,0);
426        */
427
428        dag_init_format_out_data(libtrace);
429        /* Grab the mutex while we're likely to be messing with the device
430         * list */
431        pthread_mutex_lock(&open_dag_mutex);
432
433        /* Specific streams are signified using a comma in the libtrace URI,
434         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
435         *
436         * If no stream is specified, we will write using stream 1 */
437        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
438                dag_dev_name = strdup(libtrace->uridata);
439        } else {
440                dag_dev_name = (char *)strndup(libtrace->uridata,
441                                (size_t)(scan - libtrace->uridata));
442                stream = atoi(++scan);
443        }
444        FORMAT_DATA_OUT->dagstream = stream;
445
446        /* See if our DAG device is already open */
447        dag_device = dag_find_open_device(dag_dev_name);
448
449        if (dag_device == NULL) {
450                /* Device not yet opened - open it ourselves */
451                dag_device = dag_open_output_device(libtrace, dag_dev_name);
452        } else {
453                /* Otherwise, just use the existing one */
454                free(dag_dev_name);
455                dag_dev_name = NULL;
456        }
457
458        /* Make sure we have successfully opened a DAG device */
459        if (dag_device == NULL) {
460                if (dag_dev_name) {
461                        free(dag_dev_name);
462                }
463                pthread_mutex_unlock(&open_dag_mutex);
464                return -1;
465        }
466
467        FORMAT_DATA_OUT->device = dag_device;
468        pthread_mutex_unlock(&open_dag_mutex);
469        return 0;
470}
471
472/* Creates and initialises a DAG input trace */
473static int dag_init_input(libtrace_t *libtrace) {
474        /* Upon successful creation, the device name is stored against the
475         * device and free when it is free()d */
476        char *dag_dev_name = NULL;
477        char *scan = NULL;
478        int stream = 0;
479        struct dag_dev_t *dag_device = NULL;
480
481        dag_init_format_data(libtrace);
482        /* Grab the mutex while we're likely to be messing with the device
483         * list */
484        pthread_mutex_lock(&open_dag_mutex);
485
486
487        /* DAG cards support multiple streams. In a single threaded capture,
488         * these are specified using a comma in the libtrace URI,
489         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
490         *
491         * If no stream is specified, we will read from stream 0 with
492         * one thread
493         */
494        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
495                dag_dev_name = strdup(libtrace->uridata);
496        } else {
497                dag_dev_name = (char *)strndup(libtrace->uridata,
498                                (size_t)(scan - libtrace->uridata));
499                stream = atoi(++scan);
500        }
501
502        FORMAT_DATA_FIRST->dagstream = stream;
503
504        /* See if our DAG device is already open */
505        dag_device = dag_find_open_device(dag_dev_name);
506
507        if (dag_device == NULL) {
508                /* Device not yet opened - open it ourselves */
509                dag_device = dag_open_device(libtrace, dag_dev_name);
510        } else {
511                /* Otherwise, just use the existing one */
512                free(dag_dev_name);
513                dag_dev_name = NULL;
514        }
515
516        /* Make sure we have successfully opened a DAG device */
517        if (dag_device == NULL) {
518                if (dag_dev_name)
519                        free(dag_dev_name);
520                dag_dev_name = NULL;
521                pthread_mutex_unlock(&open_dag_mutex);
522                return -1;
523        }
524
525        FORMAT_DATA->device = dag_device;
526
527        /* See Config_Status_API_Programming_Guide.pdf from the Endace
528           Dag Documentation */
529        /* Check kBooleanAttributeActive is true -- no point capturing
530         * on an interface that's disabled
531         *
532         * The symptom of the port being disabled is that libtrace
533         * will appear to hang. */
534        /* Check kBooleanAttributeFault is false */
535        /* Check kBooleanAttributeLocalFault is false */
536        /* Check kBooleanAttributeLock is true ? */
537        /* Check kBooleanAttributePeerLink ? */
538
539        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
540           on libtrace promisc attribute?*/
541        /* Set kUint32AttributeSnapLength to the snaplength */
542
543        pthread_mutex_unlock(&open_dag_mutex);
544        return 0;
545}
546
547#ifdef HAVE_DAG_CONFIG_API_H
548static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
549        dag_card_ref_t card_ref = NULL;
550        dag_component_t root = NULL;
551        attr_uuid_t uuid = 0;
552
553        if (slen < 0)
554                slen = 0; 
555
556        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
557        root = dag_config_get_root_component(card_ref);
558       
559        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
560        dag_config_set_boolean_attribute(card_ref, uuid, true);
561
562        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
563        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
564
565        return 0;
566       
567
568} 
569#endif /* HAVE_DAG_CONFIG_API_H */
570
571/* Configures a DAG input trace */
572static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
573                            void *data)
574{
575        switch(option) {
576        case TRACE_OPTION_META_FREQ:
577                /* This option is used to specify the frequency of DUCK
578                 * updates */
579                DUCK.duck_freq = *(int *)data;
580                return 0;
581        case TRACE_OPTION_SNAPLEN:
582#ifdef HAVE_DAG_CONFIG_API_H
583                return dag_csapi_set_snaplen(libtrace, *(int *)data);
584#else
585                /* Tell the card our new snap length */
586        {
587                char conf_str[4096];
588                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
589                if (dag_configure(FORMAT_DATA->device->fd,
590                                  conf_str) != 0) {
591                        trace_set_err(libtrace, errno, "Failed to configure "
592                                      "snaplen on DAG card: %s",
593                                      libtrace->uridata);
594                        return -1;
595                }
596        }
597#endif /* HAVE_DAG_CONFIG_API_H */
598
599                return 0;
600        case TRACE_OPTION_PROMISC:
601                /* DAG already operates in a promisc fashion */
602                return -1;
603        case TRACE_OPTION_FILTER:
604                /* We don't yet support pushing filters into DAG
605                 * cards */
606                return -1;
607        case TRACE_OPTION_EVENT_REALTIME:
608                /* Live capture is always going to be realtime */
609                return -1;
610        case TRACE_OPTION_HASHER:
611                /* Lets just say we did this, it's currently still up to
612                 * the user to configure this correctly. */
613                return 0;
614        }
615        return -1;
616}
617
618/* Starts a DAG output trace */
619static int dag_start_output(libtrace_out_t *libtrace)
620{
621        struct timeval zero, nopoll;
622
623        zero.tv_sec = 0;
624        zero.tv_usec = 0;
625        nopoll = zero;
626
627        /* Attach and start the DAG stream */
628        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
629                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
630                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
631                return -1;
632        }
633
634        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
635                        FORMAT_DATA_OUT->dagstream) < 0) {
636                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
637                return -1;
638        }
639        FORMAT_DATA_OUT->stream_attached = 1;
640
641        /* We don't want the dag card to do any sleeping */
642        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
643                        FORMAT_DATA_OUT->dagstream, 0, &zero,
644                        &nopoll);
645
646        return 0;
647}
648
649static int dag_start_input_stream(libtrace_t *libtrace,
650                                  struct dag_per_stream_t * stream) {
651        struct timeval zero, nopoll;
652        uint8_t *top, *bottom, *starttop;
653        top = bottom = NULL;
654
655        zero.tv_sec = 0;
656        zero.tv_usec = 10000;
657        nopoll = zero;
658
659        /* Attach and start the DAG stream */
660        if (dag_attach_stream(FORMAT_DATA->device->fd,
661                              stream->dagstream, 0, 0) < 0) {
662                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
663                              stream->dagstream);
664                return -1;
665        }
666
667        if (dag_start_stream(FORMAT_DATA->device->fd,
668                             stream->dagstream) < 0) {
669                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
670                              stream->dagstream);
671                return -1;
672        }
673        FORMAT_DATA->stream_attached = 1;
674
675        /* We don't want the dag card to do any sleeping */
676        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
677                            stream->dagstream, 0, &zero,
678                            &nopoll) < 0) {
679                trace_set_err(libtrace, errno,
680                              "dag_set_stream_poll failed!");
681                return -1;
682        }
683
684        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
685                                      stream->dagstream,
686                                      &bottom);
687
688        /* Should probably flush the memory hole now */
689        top = starttop;
690        while (starttop - bottom > 0) {
691                bottom += (starttop - bottom);
692                top = dag_advance_stream(FORMAT_DATA->device->fd,
693                                         stream->dagstream,
694                                         &bottom);
695        }
696        stream->top = top;
697        stream->bottom = bottom;
698        stream->processed = 0;
699        stream->drops = 0;
700
701        return 0;
702
703}
704
705/* Starts a DAG input trace */
706static int dag_start_input(libtrace_t *libtrace)
707{
708        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
709}
710
711static int dag_pstart_input(libtrace_t *libtrace)
712{
713        char *scan, *tok;
714        uint16_t stream_count = 0, max_streams;
715        int iserror = 0;
716        struct dag_per_stream_t stream_data;
717
718        /* Check we aren't trying to create more threads than the DAG card can
719         * handle */
720        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
721        if (libtrace->perpkt_thread_count > max_streams) {
722                fprintf(stderr,
723                              "WARNING: DAG has only %u streams available, "
724                              "capping total number of threads at this value.",
725                              max_streams);
726                libtrace->perpkt_thread_count = max_streams;
727        }
728
729        /* Get the stream names from the uri */
730        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
731                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
732                              "Format uri doesn't specify the DAG streams");
733                iserror = 1;
734                goto cleanup;
735        }
736
737        scan++;
738
739        tok = strtok(scan, ",");
740        while (tok != NULL) {
741                /* Ensure we haven't specified too many streams */
742                if (stream_count >= libtrace->perpkt_thread_count) {
743                        fprintf(stderr,
744                                      "WARNING: Format uri specifies too many "
745                                      "streams. Maximum is %u, so only using "
746                                      "the first %u from the uri.",
747                                      libtrace->perpkt_thread_count, 
748                                      libtrace->perpkt_thread_count);
749                        break;
750                }
751
752                /* Save the stream details */
753                if (stream_count == 0) {
754                        /* Special case where we update the existing stream
755                         * data structure */
756                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
757                } else {
758                        memset(&stream_data, 0, sizeof(stream_data));
759                        stream_data.dagstream = (uint16_t)atoi(tok);
760                        libtrace_list_push_back(FORMAT_DATA->per_stream,
761                                                &stream_data);
762                }
763
764                stream_count++;
765                tok = strtok(NULL, ",");
766        }
767
768        if (stream_count < libtrace->perpkt_thread_count) {
769                libtrace->perpkt_thread_count = stream_count;
770        }
771       
772        FORMAT_DATA->stream_attached = 1;
773
774 cleanup:
775        if (iserror) {
776                return -1;
777        } else {
778                return 0;
779        }
780}
781
782/* Pauses a DAG output trace */
783static int dag_pause_output(libtrace_out_t *libtrace)
784{
785        /* Stop and detach the stream */
786        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
787                            FORMAT_DATA_OUT->dagstream) < 0) {
788                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
789                return -1;
790        }
791        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
792                              FORMAT_DATA_OUT->dagstream) < 0) {
793                trace_set_err_out(libtrace, errno,
794                                  "Could not detach DAG stream");
795                return -1;
796        }
797        FORMAT_DATA_OUT->stream_attached = 0;
798        return 0;
799}
800
801/* Pauses a DAG input trace */
802static int dag_pause_input(libtrace_t *libtrace)
803{
804        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
805
806        /* Stop and detach each stream */
807        while (tmp != NULL) {
808                if (dag_stop_stream(FORMAT_DATA->device->fd,
809                                    STREAM_DATA(tmp)->dagstream) < 0) {
810                        trace_set_err(libtrace, errno,
811                                      "Could not stop DAG stream");
812                        return -1;
813                }
814                if (dag_detach_stream(FORMAT_DATA->device->fd,
815                                      STREAM_DATA(tmp)->dagstream) < 0) {
816                        trace_set_err(libtrace, errno,
817                                      "Could not detach DAG stream");
818                        return -1;
819                }
820
821                tmp = tmp->next;
822        }
823
824        FORMAT_DATA->stream_attached = 0;
825        return 0;
826}
827
828
829
830/* Closes a DAG input trace */
831static int dag_fin_input(libtrace_t *libtrace)
832{
833        /* Need the lock, since we're going to be handling the device list */
834        pthread_mutex_lock(&open_dag_mutex);
835
836        /* Detach the stream if we are not paused */
837        if (FORMAT_DATA->stream_attached)
838                dag_pause_input(libtrace);
839        FORMAT_DATA->device->ref_count--;
840
841        /* Close the DAG device if there are no more references to it */
842        if (FORMAT_DATA->device->ref_count == 0)
843                dag_close_device(FORMAT_DATA->device);
844
845        if (DUCK.dummy_duck)
846                trace_destroy_dead(DUCK.dummy_duck);
847
848        /* Clear the list */
849        libtrace_list_deinit(FORMAT_DATA->per_stream);
850        free(libtrace->format_data);
851        pthread_mutex_unlock(&open_dag_mutex);
852        return 0; /* success */
853}
854
855/* Closes a DAG output trace */
856static int dag_fin_output(libtrace_out_t *libtrace)
857{
858
859        /* Commit any outstanding traffic in the txbuffer */
860        if (FORMAT_DATA_OUT->waiting) {
861                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
862                                           FORMAT_DATA_OUT->dagstream,
863                                           FORMAT_DATA_OUT->waiting );
864        }
865
866        /* Wait until the buffer is nearly clear before exiting the program,
867         * as we will lose packets otherwise */
868        dag_tx_get_stream_space
869                (FORMAT_DATA_OUT->device->fd,
870                 FORMAT_DATA_OUT->dagstream,
871                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
872                                            FORMAT_DATA_OUT->dagstream) - 8);
873
874        /* Need the lock, since we're going to be handling the device list */
875        pthread_mutex_lock(&open_dag_mutex);
876
877        /* Detach the stream if we are not paused */
878        if (FORMAT_DATA_OUT->stream_attached)
879                dag_pause_output(libtrace);
880        FORMAT_DATA_OUT->device->ref_count --;
881
882        /* Close the DAG device if there are no more references to it */
883        if (FORMAT_DATA_OUT->device->ref_count == 0)
884                dag_close_device(FORMAT_DATA_OUT->device);
885        free(libtrace->format_data);
886        pthread_mutex_unlock(&open_dag_mutex);
887        return 0; /* success */
888}
889
890#ifdef DAGIOC_CARD_DUCK
891#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
892#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
893#else
894#ifdef DAGIOCDUCK
895#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
896#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
897#else
898#warning "DAG appears to be missing DUCK support"
899#endif
900#endif
901
902/* Extracts DUCK information from the DAG card and produces a DUCK packet */
903static int dag_get_duckinfo(libtrace_t *libtrace,
904                                libtrace_packet_t *packet) {
905
906        if (DUCK.duck_freq == 0)
907                return 0;
908
909#ifndef LIBTRACE_DUCK_IOCTL
910        trace_set_err(libtrace, errno, 
911                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
912        DUCK.duck_freq = 0;
913        return -1;
914#endif
915
916        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
917                return 0;
918
919        /* Allocate memory for the DUCK data */
920        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
921            !packet->buffer) {
922                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
923                packet->buf_control = TRACE_CTRL_PACKET;
924                if (!packet->buffer) {
925                        trace_set_err(libtrace, errno,
926                                      "Cannot allocate packet buffer");
927                        return -1;
928                }
929        }
930
931        /* DUCK doesn't have a format header */
932        packet->header = 0;
933        packet->payload = packet->buffer;
934
935        /* No need to check if we can get DUCK or not - we're modern
936         * enough so just grab the DUCK info */
937        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
938                   (duckinf_t *)packet->payload) < 0)) {
939                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
940                DUCK.duck_freq = 0;
941                return -1;
942        }
943
944        packet->type = LIBTRACE_DUCK_VERSION;
945
946        /* Set the packet's trace to point at a DUCK trace, so that the
947         * DUCK format functions will be called on the packet rather than the
948         * DAG ones */
949        if (!DUCK.dummy_duck)
950                DUCK.dummy_duck = trace_create_dead("duck:dummy");
951        packet->trace = DUCK.dummy_duck;
952        DUCK.last_duck = DUCK.last_pkt;
953        packet->error = sizeof(duckinf_t);
954        return sizeof(duckinf_t);
955}
956
957/* Determines the amount of data available to read from the DAG card */
958static int dag_available(libtrace_t *libtrace,
959                         struct dag_per_stream_t *stream_data)
960{
961        uint32_t diff = stream_data->top - stream_data->bottom;
962
963        /* If we've processed more than 4MB of data since we last called
964         * dag_advance_stream, then we should call it again to allow the
965         * space occupied by that 4MB to be released */
966        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
967                return diff;
968
969        /* Update the top and bottom pointers */
970        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
971                                              stream_data->dagstream,
972                                              &(stream_data->bottom));
973
974        if (stream_data->top == NULL) {
975                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
976                return -1;
977        }
978        stream_data->processed = 0;
979        diff = stream_data->top - stream_data->bottom;
980        return diff;
981}
982
983/* Returns a pointer to the start of the next complete ERF record */
984static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
985{
986        dag_record_t *erfptr = NULL;
987        uint16_t size;
988
989        erfptr = (dag_record_t *)stream_data->bottom;
990        if (!erfptr)
991                return NULL;
992
993        size = ntohs(erfptr->rlen);
994        assert( size >= dag_record_size );
995
996        /* Make certain we have the full packet available */
997        if (size > (stream_data->top - stream_data->bottom))
998                return NULL;
999
1000        stream_data->bottom += size;
1001        stream_data->processed += size;
1002        return erfptr;
1003}
1004
1005/* Converts a buffer containing a recently read DAG packet record into a
1006 * libtrace packet */
1007static int dag_prepare_packet_stream(libtrace_t *libtrace,
1008                                     struct dag_per_stream_t *stream_data,
1009                                     libtrace_packet_t *packet,
1010                                     void *buffer, libtrace_rt_types_t rt_type,
1011                                     uint32_t flags)
1012{
1013        dag_record_t *erfptr;
1014
1015        /* If the packet previously owned a buffer that is not the buffer
1016         * that contains the new packet data, we're going to need to free the
1017         * old one to avoid memory leaks */
1018        if (packet->buffer != buffer &&
1019            packet->buf_control == TRACE_CTRL_PACKET) {
1020                free(packet->buffer);
1021        }
1022
1023        /* Set the buffer owner appropriately */
1024        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1025                packet->buf_control = TRACE_CTRL_PACKET;
1026        } else
1027                packet->buf_control = TRACE_CTRL_EXTERNAL;
1028
1029        /* Update the packet pointers and type appropriately */
1030        erfptr = (dag_record_t *)buffer;
1031        packet->buffer = erfptr;
1032        packet->header = erfptr;
1033        packet->type = rt_type;
1034
1035        if (erfptr->flags.rxerror == 1) {
1036                /* rxerror means the payload is corrupt - drop the payload
1037                 * by tweaking rlen */
1038                packet->payload = NULL;
1039                erfptr->rlen = htons(erf_get_framing_length(packet));
1040        } else {
1041                packet->payload = (char*)packet->buffer
1042                        + erf_get_framing_length(packet);
1043        }
1044
1045        if (libtrace->format_data == NULL) {
1046                dag_init_format_data(libtrace);
1047        }
1048
1049        /* Update the dropped packets counter */
1050        /* No loss counter for DSM coloured records - have to use some
1051         * other API */
1052        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
1053                /* TODO */
1054        } else {
1055                /* Use the ERF loss counter */
1056                if (stream_data->seeninterface[erfptr->flags.iface]
1057                    == 0) {
1058                        stream_data->seeninterface[erfptr->flags.iface]
1059                                = 1;
1060                } else {
1061                        stream_data->drops += ntohs(erfptr->lctr);
1062                }
1063        }
1064
1065        return 0;
1066}
1067
1068static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1069                              void *buffer, libtrace_rt_types_t rt_type,
1070                              uint32_t flags)
1071{
1072        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1073                                       buffer, rt_type, flags);
1074}
1075
1076/*
1077 * dag_write_packet() at this stage attempts to improve tx performance
1078 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1079 * has been met. I observed approximately 270% performance increase
1080 * through this relatively naive tweak. No optimisation of buffer sizes
1081 * was attempted.
1082 */
1083
1084/* Pushes an ERF record onto the transmit stream */
1085static int dag_dump_packet(libtrace_out_t *libtrace,
1086                           dag_record_t *erfptr, unsigned int pad,
1087                           void *buffer)
1088{
1089        int size;
1090
1091        /*
1092         * If we've got 0 bytes waiting in the txqueue, assume that we
1093         * haven't requested any space yet, and request some, storing
1094         * the pointer at FORMAT_DATA_OUT->txbuffer.
1095         *
1096         * The amount to request is slightly magical at the moment - it's
1097         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1098         * the buffer and handle overruns.
1099         */
1100        if (FORMAT_DATA_OUT->waiting == 0) {
1101                FORMAT_DATA_OUT->txbuffer =
1102                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1103                                                FORMAT_DATA_OUT->dagstream,
1104                                                16908288);
1105        }
1106
1107        /*
1108         * Copy the header separately to the body, as we can't guarantee they
1109         * are in contiguous memory
1110         */
1111        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1112               (dag_record_size + pad));
1113        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1114
1115        /*
1116         * Copy our incoming packet into the outgoing buffer, and increment
1117         * our waiting count
1118         */
1119        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1120        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1121               size);
1122        FORMAT_DATA_OUT->waiting += size;
1123
1124        /*
1125         * If our output buffer has more than 16 Mebibytes in it, commit those
1126         * bytes and reset the waiting count to 0.
1127         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1128         * case there is still data in the buffer at program exit.
1129         */
1130        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1131                FORMAT_DATA_OUT->txbuffer =
1132                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1133                                                   FORMAT_DATA_OUT->dagstream,
1134                                                   FORMAT_DATA_OUT->waiting);
1135                FORMAT_DATA_OUT->waiting = 0;
1136        }
1137
1138        return size + pad + dag_record_size;
1139}
1140
1141/* Attempts to determine a suitable ERF type for a given packet. Returns true
1142 * if one is found, false otherwise */
1143static bool find_compatible_linktype(libtrace_out_t *libtrace,
1144                                     libtrace_packet_t *packet, char *type)
1145{
1146        /* Keep trying to simplify the packet until we can find
1147         * something we can do with it */
1148
1149        do {
1150                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1151
1152                /* Success */
1153                if (*type != (char)-1)
1154                        return true;
1155
1156                if (!demote_packet(packet)) {
1157                        trace_set_err_out(libtrace,
1158                                          TRACE_ERR_NO_CONVERSION,
1159                                          "No erf type for packet (%i)",
1160                                          trace_get_link_type(packet));
1161                        return false;
1162                }
1163
1164        } while(1);
1165
1166        return true;
1167}
1168
1169/* Writes a packet to the provided DAG output trace */
1170static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1171{
1172        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1173         * coding sucks, sorry about that.
1174         */
1175        unsigned int pad = 0;
1176        int numbytes;
1177        void *payload = packet->payload;
1178        dag_record_t *header = (dag_record_t *)packet->header;
1179        char erf_type = 0;
1180
1181        if(!packet->header) {
1182                /* No header, probably an RT packet. Lifted from
1183                 * erf_write_packet(). */
1184                return -1;
1185        }
1186
1187        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1188                return 0;
1189
1190        pad = dag_get_padding(packet);
1191
1192        /*
1193         * If the payload is null, adjust the rlen. Discussion of this is
1194         * attached to erf_write_packet()
1195         */
1196        if (payload == NULL) {
1197                header->rlen = htons(dag_record_size + pad);
1198        }
1199
1200        if (packet->type == TRACE_RT_DATA_ERF) {
1201                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1202        } else {
1203                /* Build up a new packet header from the existing header */
1204
1205                /* Simplify the packet first - if we can't do this, break
1206                 * early */
1207                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1208                        return -1;
1209
1210                dag_record_t erfhdr;
1211
1212                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1213                payload=packet->payload;
1214                pad = dag_get_padding(packet);
1215
1216                /* Flags. Can't do this */
1217                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1218                if (trace_get_direction(packet)!=(int)~0U)
1219                        erfhdr.flags.iface = trace_get_direction(packet);
1220
1221                erfhdr.type = erf_type;
1222
1223                /* Packet length (rlen includes format overhead) */
1224                assert(trace_get_capture_length(packet) > 0
1225                       && trace_get_capture_length(packet) <= 65536);
1226                assert(erf_get_framing_length(packet) > 0
1227                       && trace_get_framing_length(packet) <= 65536);
1228                assert(trace_get_capture_length(packet) +
1229                       erf_get_framing_length(packet) > 0
1230                       && trace_get_capture_length(packet) +
1231                       erf_get_framing_length(packet) <= 65536);
1232
1233                erfhdr.rlen = htons(trace_get_capture_length(packet)
1234                                    + erf_get_framing_length(packet));
1235
1236
1237                /* Loss counter. Can't do this */
1238                erfhdr.lctr = 0;
1239                /* Wire length, does not include padding! */
1240                erfhdr.wlen = htons(trace_get_wire_length(packet));
1241
1242                /* Write it out */
1243                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1244        }
1245
1246        return numbytes;
1247}
1248
1249/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1250 *
1251 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1252 */
1253static int dag_read_packet_stream(libtrace_t *libtrace,
1254                                struct dag_per_stream_t *stream_data,
1255                                libtrace_thread_t *t, /* Optional */
1256                                libtrace_packet_t *packet)
1257{
1258        int size = 0;
1259        dag_record_t *erfptr = NULL;
1260        struct timeval tv;
1261        int numbytes = 0;
1262        uint32_t flags = 0;
1263        struct timeval maxwait, pollwait;
1264
1265        pollwait.tv_sec = 0;
1266        pollwait.tv_usec = 10000;
1267        maxwait.tv_sec = 0;
1268        maxwait.tv_usec = 250000;
1269
1270        /* Check if we're due for a DUCK report - only report on the first thread */
1271        if (stream_data == FORMAT_DATA_FIRST) {
1272                size = dag_get_duckinfo(libtrace, packet);
1273                if (size != 0)
1274                        return size;
1275        }
1276
1277
1278        /* Don't let anyone try to free our DAG memory hole! */
1279        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1280
1281        /* If the packet buffer is currently owned by libtrace, free it so
1282         * that we can set the packet to point into the DAG memory hole */
1283        if (packet->buf_control == TRACE_CTRL_PACKET) {
1284                free(packet->buffer);
1285                packet->buffer = 0;
1286        }
1287
1288        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
1289                                sizeof(dag_record_t), &maxwait,
1290                                &pollwait) == -1) {
1291                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1292                return -1;
1293        }
1294
1295        /* Grab a full ERF record */
1296        do {
1297                numbytes = dag_available(libtrace, stream_data);
1298                if (numbytes < 0)
1299                        return numbytes;
1300                if (numbytes < dag_record_size) {
1301                        /* Check the message queue if we have one to check */
1302                        if (t != NULL &&
1303                            libtrace_message_queue_count(&t->messages) > 0)
1304                                return -2;
1305
1306                        if ((numbytes=is_halted(libtrace)) != -1)
1307                                return numbytes;
1308                        /* Block until we see a packet */
1309                        continue;
1310                }
1311                erfptr = dag_get_record(stream_data);
1312        } while (erfptr == NULL);
1313
1314        packet->trace = libtrace;
1315        /* Prepare the libtrace packet */
1316        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1317                                    TRACE_RT_DATA_ERF, flags))
1318                return -1;
1319
1320        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1321        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1322                tv = trace_get_timeval(packet);
1323                DUCK.last_pkt = tv.tv_sec;
1324        }
1325
1326        packet->error = packet->payload ? htons(erfptr->rlen) :
1327                                          erf_get_framing_length(packet);
1328
1329        return packet->error;
1330}
1331
1332static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1333{
1334        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1335}
1336
1337static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1338                             libtrace_packet_t **packets, size_t nb_packets)
1339{
1340        int ret;
1341        size_t read_packets = 0;
1342        int numbytes = 0;
1343
1344        struct dag_per_stream_t *stream_data =
1345                (struct dag_per_stream_t *)t->format_data;
1346
1347        /* Read as many packets as we can, but read atleast one packet */
1348        do {
1349                ret = dag_read_packet_stream(libtrace, stream_data, t,
1350                                           packets[read_packets]);
1351                if (ret < 0)
1352                        return ret;
1353
1354                read_packets++;
1355
1356                /* Make sure we don't read too many packets..! */
1357                if (read_packets >= nb_packets)
1358                        break;
1359
1360                numbytes = dag_available(libtrace, stream_data);
1361        } while (numbytes >= dag_record_size);
1362
1363        return read_packets;
1364}
1365
1366/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1367 * packet is available, we will return a packet event. Otherwise we will
1368 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1369 */
1370static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1371                                           libtrace_packet_t *packet)
1372{
1373        libtrace_eventobj_t event = {0,0,0.0,0};
1374        dag_record_t *erfptr = NULL;
1375        int numbytes;
1376        uint32_t flags = 0;
1377        struct timeval minwait, tv;
1378       
1379        minwait.tv_sec = 0;
1380        minwait.tv_usec = 10000;
1381
1382        /* Check if we're meant to provide a DUCK update */
1383        numbytes = dag_get_duckinfo(libtrace, packet);
1384        if (numbytes < 0) {
1385                event.type = TRACE_EVENT_TERMINATE;
1386                return event;
1387        } else if (numbytes > 0) {
1388                event.type = TRACE_EVENT_PACKET;
1389                return event;
1390        }
1391       
1392        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
1393                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1394                                &minwait) == -1) {
1395                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1396                event.type = TRACE_EVENT_TERMINATE;
1397                return event;
1398        }
1399
1400        do {
1401                erfptr = NULL;
1402                numbytes = 0;
1403
1404                /* Need to call dag_available so that the top pointer will get
1405                 * updated, otherwise we'll never see any data! */
1406                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1407
1408                /* May as well not bother calling dag_get_record if
1409                 * dag_available suggests that there's no data */
1410                if (numbytes != 0)
1411                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1412                if (erfptr == NULL) {
1413                        /* No packet available - sleep for a very short time */
1414                        if (libtrace_halt) {
1415                                event.type = TRACE_EVENT_TERMINATE;
1416                        } else {
1417                                event.type = TRACE_EVENT_SLEEP;
1418                                event.seconds = 0.0001;
1419                        }
1420                        break;
1421                }
1422                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1423                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1424                        event.type = TRACE_EVENT_TERMINATE;
1425                        break;
1426                }
1427
1428
1429                event.size = trace_get_capture_length(packet) +
1430                        trace_get_framing_length(packet);
1431
1432                /* XXX trace_read_packet() normally applies the following
1433                 * config options for us, but this function is called via
1434                 * trace_event() so we have to do it ourselves */
1435
1436                if (libtrace->filter) {
1437                        int filtret = trace_apply_filter(libtrace->filter,
1438                                                         packet);
1439                        if (filtret == -1) {
1440                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1441                                              "Bad BPF Filter");
1442                                event.type = TRACE_EVENT_TERMINATE;
1443                                break;
1444                        }
1445
1446                        if (filtret == 0) {
1447                                /* This packet isn't useful so we want to
1448                                 * immediately see if there is another suitable
1449                                 * one - we definitely DO NOT want to return
1450                                 * a sleep event in this case, like we used to
1451                                 * do! */
1452                                libtrace->filtered_packets ++;
1453                                trace_clear_cache(packet);
1454                                continue;
1455                        }
1456
1457                        event.type = TRACE_EVENT_PACKET;
1458                } else {
1459                        event.type = TRACE_EVENT_PACKET;
1460                }
1461
1462                /* Update the DUCK timer */
1463                tv = trace_get_timeval(packet);
1464                DUCK.last_pkt = tv.tv_sec;
1465               
1466                if (libtrace->snaplen > 0) {
1467                        trace_set_capture_length(packet, libtrace->snaplen);
1468                }
1469                libtrace->accepted_packets ++;
1470                break;
1471        } while(1);
1472
1473        return event;
1474}
1475
1476static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1477{
1478        libtrace_list_node_t *tmp;
1479        assert(stat && libtrace);
1480        tmp = FORMAT_DATA_HEAD;
1481
1482        /* Dropped packets */
1483        stat->dropped_valid = 1;
1484        stat->dropped = 0;
1485        while (tmp != NULL) {
1486                stat->dropped += STREAM_DATA(tmp)->drops;
1487                tmp = tmp->next;
1488        }
1489
1490}
1491
1492static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1493                                       libtrace_stat_t *stat) {
1494        struct dag_per_stream_t *stream_data = t->format_data;
1495        assert(stat && libtrace);
1496
1497        stat->dropped_valid = 1;
1498        stat->dropped = stream_data->drops;
1499
1500        stat->filtered_valid = 1;
1501        stat->filtered = 0;
1502}
1503
1504/* Prints some semi-useful help text about the DAG format module */
1505static void dag_help(void) {
1506        printf("dag format module: $Revision: 1755 $\n");
1507        printf("Supported input URIs:\n");
1508        printf("\tdag:/dev/dagn\n");
1509        printf("\n");
1510        printf("\te.g.: dag:/dev/dag0\n");
1511        printf("\n");
1512        printf("Supported output URIs:\n");
1513        printf("\tnone\n");
1514        printf("\n");
1515}
1516
1517static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1518                                bool reader)
1519{
1520        struct dag_per_stream_t *stream_data;
1521        libtrace_list_node_t *node;
1522
1523        if (reader && t->type == THREAD_PERPKT) {
1524                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1525                                                t->perpkt_num);
1526                if (node == NULL) {
1527                        return -1;
1528                }
1529                stream_data = node->data;
1530
1531                /* Pass the per thread data to the thread */
1532                t->format_data = stream_data;
1533
1534                /* Attach and start the DAG stream */
1535                if (dag_start_input_stream(libtrace, stream_data) < 0)
1536                        return -1;
1537        }
1538
1539        return 0;
1540}
1541
1542static struct libtrace_format_t dag = {
1543        "dag",
1544        "$Id$",
1545        TRACE_FORMAT_ERF,
1546        dag_probe_filename,             /* probe filename */
1547        NULL,                           /* probe magic */
1548        dag_init_input,                 /* init_input */
1549        dag_config_input,               /* config_input */
1550        dag_start_input,                /* start_input */
1551        dag_pause_input,                /* pause_input */
1552        dag_init_output,                /* init_output */
1553        NULL,                           /* config_output */
1554        dag_start_output,               /* start_output */
1555        dag_fin_input,                  /* fin_input */
1556        dag_fin_output,                 /* fin_output */
1557        dag_read_packet,                /* read_packet */
1558        dag_prepare_packet,             /* prepare_packet */
1559        NULL,                           /* fin_packet */
1560        dag_write_packet,               /* write_packet */
1561        erf_get_link_type,              /* get_link_type */
1562        erf_get_direction,              /* get_direction */
1563        erf_set_direction,              /* set_direction */
1564        erf_get_erf_timestamp,          /* get_erf_timestamp */
1565        NULL,                           /* get_timeval */
1566        NULL,                           /* get_seconds */
1567        NULL,                           /* get_timespec */
1568        NULL,                           /* seek_erf */
1569        NULL,                           /* seek_timeval */
1570        NULL,                           /* seek_seconds */
1571        erf_get_capture_length,         /* get_capture_length */
1572        erf_get_wire_length,            /* get_wire_length */
1573        erf_get_framing_length,         /* get_framing_length */
1574        erf_set_capture_length,         /* set_capture_length */
1575        NULL,                           /* get_received_packets */
1576        NULL,                           /* get_filtered_packets */
1577        NULL,                           /* get_dropped_packets */
1578        dag_get_statistics,             /* get_statistics */
1579        NULL,                           /* get_fd */
1580        trace_event_dag,                /* trace_event */
1581        dag_help,                       /* help */
1582        NULL,                            /* next pointer */
1583        {true, 0}, /* live packet capture, thread limit TBD */
1584        dag_pstart_input,
1585        dag_pread_packets,
1586        dag_pause_input,
1587        NULL,
1588        dag_pregister_thread,
1589        NULL,
1590        dag_get_thread_statistics       /* get thread stats */
1591};
1592
1593void dag_constructor(void)
1594{
1595        register_format(&dag);
1596}
Note: See TracBrowser for help on using the repository browser.