source: lib/format_dag25.c @ db84bb2

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since db84bb2 was db84bb2, checked in by Shane Alcock <salcock@…>, 4 years ago

Ensure packet->order is always strictly incrementing

We cannot equate timestamp with packet->order, as some timestamp
methods are not strictly monotonic (ring: and int:).

Each format is now responsible for determining packet->order
during pread, so that the format can detect and correct such
inaccuracies.

More specifically, ring: and int: will cache the last reported
timestamp per thread and if time goes backwards, the order will
be set to last+1, otherwise the timestamp will be used.

DAG and DPDK still use the timestamp for ordering, since there
have been no issues with the timestamp ordering for these formats
(thus far!).

  • Property mode set to 100644
File size: 45.2 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#define _GNU_SOURCE
27
28#include "config.h"
29#include "common.h"
30#include "libtrace.h"
31#include "libtrace_int.h"
32#include "format_helper.h"
33#include "format_erf.h"
34
35#include <assert.h>
36#include <errno.h>
37#include <fcntl.h>
38#include <stdio.h>
39#include <string.h>
40#include <stdlib.h>
41#include <sys/stat.h>
42
43#include <sys/mman.h>
44/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
45 * Windows anyway so we'll worry about this more later :] */
46#include <pthread.h>
47
48
49#ifdef HAVE_DAG_CONFIG_API_H
50#include <dag_config_api.h>
51#endif
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66/* This format deals with DAG cards that are using drivers from the 2.5 version
67 * onwards, including 3.X.
68 *
69 * DAG is a LIVE capture format.
70 *
71 * This format does support writing, provided the DAG card that you are using
72 * has transmit (Tx) support. Additionally, packets read using this format
73 * are in the ERF format, so can easily be written as ERF traces without
74 * losing any data.
75 */
76
77#define DATA(x) ((struct dag_format_data_t *)x->format_data)
78#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
79#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
80
81#define FORMAT_DATA DATA(libtrace)
82#define FORMAT_DATA_OUT DATA_OUT(libtrace)
83
84#define DUCK FORMAT_DATA->duck
85
86#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
87#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
88
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* The DAG device being used for writing */
107        struct dag_dev_t *device;
108        /* The DAG stream that is being written on */
109        unsigned int dagstream;
110        /* Boolean flag indicating whether the stream is currently attached */
111        int stream_attached;
112        /* The amount of data waiting to be transmitted, in bytes */
113        uint64_t waiting;
114        /* A buffer to hold the data to be transmittted */
115        uint8_t *txbuffer;
116};
117
118/* Data that is stored against each input stream */
119struct dag_per_stream_t {
120        /* DAG stream number */
121        uint16_t dagstream;
122        /* Pointer to the last unread byte in the DAG memory */
123        uint8_t *top;
124        /* Pointer to the first unread byte in the DAG memory */
125        uint8_t *bottom;
126        /* Amount of data processed from the bottom pointer */
127        uint32_t processed;
128        /* Number of packets seen by the stream */
129        uint64_t pkt_count;
130        /* Drop count for this particular stream */
131        uint64_t drops;
132        /* Boolean values to indicate if a particular interface has been seen
133         * or not. This is limited to four interfaces, which is enough to
134         * support all current DAG cards */
135        uint8_t seeninterface[4];
136};
137
138/* "Global" data that is stored for each DAG input trace */
139struct dag_format_data_t {
140        /* DAG device */
141        struct dag_dev_t *device;
142        /* Boolean flag indicating whether the trace is currently attached */
143        int stream_attached;
144        /* Data stored against each DAG input stream */
145        libtrace_list_t *per_stream;
146
147        /* Data required for regular DUCK reporting.
148         * We put this on a new cache line otherwise we have a lot of false
149         * sharing caused by updating the last_pkt.
150         * This should only ever be accessed by the first thread stream,
151         * that includes both read and write operations.
152         */
153        struct {
154                /* Timestamp of the last DUCK report */
155                uint32_t last_duck;
156                /* The number of seconds between each DUCK report */
157                uint32_t duck_freq;
158                /* Timestamp of the last packet read from the DAG card */
159                uint32_t last_pkt;
160                /* Dummy trace to ensure DUCK packets are dealt with using the
161                 * DUCK format functions */
162                libtrace_t *dummy_duck;
163        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
164};
165
166/* To be thread-safe, we're going to need a mutex for operating on the list
167 * of DAG devices */
168pthread_mutex_t open_dag_mutex;
169
170/* The list of DAG devices that have been opened by libtrace.
171 *
172 * We can only open each DAG device once, but we might want to read from
173 * multiple streams. Therefore, we need to maintain a list of devices that we
174 * have opened (with ref counts!) so that we don't try to open a device too
175 * many times or close a device that we're still using */
176struct dag_dev_t *open_dags = NULL;
177
178/* Returns the amount of padding between the ERF header and the start of the
179 * captured packet data */
180static int dag_get_padding(const libtrace_packet_t *packet)
181{
182        /* ERF Ethernet records have a 2 byte padding before the packet itself
183         * so that the IP header is aligned on a 32 bit boundary.
184         */
185        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
186                dag_record_t *erfptr = (dag_record_t *)packet->header;
187                switch(erfptr->type) {
188                        case TYPE_ETH:
189                        case TYPE_DSM_COLOR_ETH:
190                                return 2;
191                        default:                return 0;
192                }
193        }
194        else {
195                switch(trace_get_link_type(packet)) {
196                        case TRACE_TYPE_ETH:    return 2;
197                        default:                return 0;
198                }
199        }
200}
201
202/* Attempts to determine if the given filename refers to a DAG device */
203static int dag_probe_filename(const char *filename)
204{
205        struct stat statbuf;
206        /* Can we stat the file? */
207        if (stat(filename, &statbuf) != 0) {
208                return 0;
209        }
210        /* Is it a character device? */
211        if (!S_ISCHR(statbuf.st_mode)) {
212                return 0;
213        }
214        /* Yeah, it's probably us. */
215        return 1;
216}
217
218/* Initialises the DAG output data structure */
219static void dag_init_format_out_data(libtrace_out_t *libtrace)
220{
221        libtrace->format_data = (struct dag_format_data_out_t *)
222                malloc(sizeof(struct dag_format_data_out_t));
223        // no DUCK on output
224        FORMAT_DATA_OUT->stream_attached = 0;
225        FORMAT_DATA_OUT->device = NULL;
226        FORMAT_DATA_OUT->dagstream = 0;
227        FORMAT_DATA_OUT->waiting = 0;
228
229}
230
231/* Initialises the DAG input data structure */
232static void dag_init_format_data(libtrace_t *libtrace)
233{
234        struct dag_per_stream_t stream_data;
235
236        libtrace->format_data = (struct dag_format_data_t *)
237                malloc(sizeof(struct dag_format_data_t));
238        DUCK.last_duck = 0;
239        DUCK.duck_freq = 0;
240        DUCK.last_pkt = 0;
241        DUCK.dummy_duck = NULL;
242
243        FORMAT_DATA->per_stream =
244                libtrace_list_init(sizeof(stream_data));
245        assert(FORMAT_DATA->per_stream != NULL);
246
247        /* We'll start with just one instance of stream_data, and we'll
248         * add more later if we need them */
249        memset(&stream_data, 0, sizeof(stream_data));
250        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
251}
252
253/* Determines if there is already an entry for the given DAG device in the
254 * device list and increments the reference count for that device, if found.
255 *
256 * NOTE: This function assumes the open_dag_mutex is held by the caller */
257static struct dag_dev_t *dag_find_open_device(char *dev_name)
258{
259        struct dag_dev_t *dag_dev;
260
261        dag_dev = open_dags;
262
263        /* XXX: Not exactly zippy, but how often are we going to be dealing
264         * with multiple dag cards? */
265        while (dag_dev != NULL) {
266                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
267                        dag_dev->ref_count ++;
268                        return dag_dev;
269                }
270                dag_dev = dag_dev->next;
271        }
272        return NULL;
273}
274
275/* Closes a DAG device and removes it from the device list.
276 *
277 * Attempting to close a DAG device that has a non-zero reference count will
278 * cause an assertion failure!
279 *
280 * NOTE: This function assumes the open_dag_mutex is held by the caller */
281static void dag_close_device(struct dag_dev_t *dev)
282{
283        /* Need to remove from the device list */
284        assert(dev->ref_count == 0);
285
286        if (dev->prev == NULL) {
287                open_dags = dev->next;
288                if (dev->next)
289                        dev->next->prev = NULL;
290        } else {
291                dev->prev->next = dev->next;
292                if (dev->next)
293                        dev->next->prev = dev->prev;
294        }
295
296        dag_close(dev->fd);
297        free(dev);
298}
299
300
301/* Opens a new DAG device for writing and adds it to the DAG device list
302 *
303 * NOTE: this function should only be called when opening a DAG device for
304 * writing - there is little practical difference between this and the
305 * function below that covers the reading case, but we need the output trace
306 * object to report errors properly so the two functions take slightly
307 * different arguments. This is really lame and there should be a much better
308 * way of doing this.
309 *
310 * NOTE: This function assumes the open_dag_mutex is held by the caller
311 */
312static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
313                                                char *dev_name)
314{
315        struct stat buf;
316        int fd;
317        struct dag_dev_t *new_dev;
318
319        /* Make sure the device exists */
320        if (stat(dev_name, &buf) == -1) {
321                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
322                return NULL;
323        }
324
325        /* Make sure it is the appropriate type of device */
326        if (S_ISCHR(buf.st_mode)) {
327                /* Try opening the DAG device */
328                if((fd = dag_open(dev_name)) < 0) {
329                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
330                                        dev_name);
331                        return NULL;
332                }
333        } else {
334                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
335                                dev_name);
336                return NULL;
337        }
338
339        /* Add the device to our device list - it is just a doubly linked
340         * list with no inherent ordering; just tack the new one on the front
341         */
342        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
343        new_dev->fd = fd;
344        new_dev->dev_name = dev_name;
345        new_dev->ref_count = 1;
346
347        new_dev->prev = NULL;
348        new_dev->next = open_dags;
349        if (open_dags)
350                open_dags->prev = new_dev;
351
352        open_dags = new_dev;
353
354        return new_dev;
355}
356
357/* Opens a new DAG device for reading and adds it to the DAG device list
358 *
359 * NOTE: this function should only be called when opening a DAG device for
360 * reading - there is little practical difference between this and the
361 * function above that covers the writing case, but we need the input trace
362 * object to report errors properly so the two functions take slightly
363 * different arguments. This is really lame and there should be a much better
364 * way of doing this.
365 *
366 * NOTE: This function assumes the open_dag_mutex is held by the caller */
367static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
368        struct stat buf;
369        int fd;
370        struct dag_dev_t *new_dev;
371
372        /* Make sure the device exists */
373        if (stat(dev_name, &buf) == -1) {
374                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
375                return NULL;
376        }
377
378        /* Make sure it is the appropriate type of device */
379        if (S_ISCHR(buf.st_mode)) {
380                /* Try opening the DAG device */
381                if((fd = dag_open(dev_name)) < 0) {
382                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
383                                      dev_name);
384                        return NULL;
385                }
386        } else {
387                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
388                              dev_name);
389                return NULL;
390        }
391
392        /* Add the device to our device list - it is just a doubly linked
393         * list with no inherent ordering; just tack the new one on the front
394         */
395        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
396        new_dev->fd = fd;
397        new_dev->dev_name = dev_name;
398        new_dev->ref_count = 1;
399
400        new_dev->prev = NULL;
401        new_dev->next = open_dags;
402        if (open_dags)
403                open_dags->prev = new_dev;
404
405        open_dags = new_dev;
406
407        return new_dev;
408}
409
410/* Creates and initialises a DAG output trace */
411static int dag_init_output(libtrace_out_t *libtrace)
412{
413        /* Upon successful creation, the device name is stored against the
414         * device and free when it is free()d */
415        char *dag_dev_name = NULL;
416        char *scan = NULL;
417        struct dag_dev_t *dag_device = NULL;
418        int stream = 1;
419
420        /* XXX I don't know if this is important or not, but this function
421         * isn't present in all of the driver releases that this code is
422         * supposed to support! */
423        /*
424        unsigned long wake_time;
425        dagutil_sleep_get_wake_time(&wake_time,0);
426        */
427
428        dag_init_format_out_data(libtrace);
429        /* Grab the mutex while we're likely to be messing with the device
430         * list */
431        pthread_mutex_lock(&open_dag_mutex);
432
433        /* Specific streams are signified using a comma in the libtrace URI,
434         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
435         *
436         * If no stream is specified, we will write using stream 1 */
437        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
438                dag_dev_name = strdup(libtrace->uridata);
439        } else {
440                dag_dev_name = (char *)strndup(libtrace->uridata,
441                                (size_t)(scan - libtrace->uridata));
442                stream = atoi(++scan);
443        }
444        FORMAT_DATA_OUT->dagstream = stream;
445
446        /* See if our DAG device is already open */
447        dag_device = dag_find_open_device(dag_dev_name);
448
449        if (dag_device == NULL) {
450                /* Device not yet opened - open it ourselves */
451                dag_device = dag_open_output_device(libtrace, dag_dev_name);
452        } else {
453                /* Otherwise, just use the existing one */
454                free(dag_dev_name);
455                dag_dev_name = NULL;
456        }
457
458        /* Make sure we have successfully opened a DAG device */
459        if (dag_device == NULL) {
460                if (dag_dev_name) {
461                        free(dag_dev_name);
462                }
463                pthread_mutex_unlock(&open_dag_mutex);
464                return -1;
465        }
466
467        FORMAT_DATA_OUT->device = dag_device;
468        pthread_mutex_unlock(&open_dag_mutex);
469        return 0;
470}
471
472/* Creates and initialises a DAG input trace */
473static int dag_init_input(libtrace_t *libtrace) {
474        /* Upon successful creation, the device name is stored against the
475         * device and free when it is free()d */
476        char *dag_dev_name = NULL;
477        char *scan = NULL;
478        int stream = 0;
479        struct dag_dev_t *dag_device = NULL;
480
481        dag_init_format_data(libtrace);
482        /* Grab the mutex while we're likely to be messing with the device
483         * list */
484        pthread_mutex_lock(&open_dag_mutex);
485
486
487        /* DAG cards support multiple streams. In a single threaded capture,
488         * these are specified using a comma in the libtrace URI,
489         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
490         *
491         * If no stream is specified, we will read from stream 0 with
492         * one thread
493         */
494        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
495                dag_dev_name = strdup(libtrace->uridata);
496        } else {
497                dag_dev_name = (char *)strndup(libtrace->uridata,
498                                (size_t)(scan - libtrace->uridata));
499                stream = atoi(++scan);
500        }
501
502        FORMAT_DATA_FIRST->dagstream = stream;
503
504        /* See if our DAG device is already open */
505        dag_device = dag_find_open_device(dag_dev_name);
506
507        if (dag_device == NULL) {
508                /* Device not yet opened - open it ourselves */
509                dag_device = dag_open_device(libtrace, dag_dev_name);
510        } else {
511                /* Otherwise, just use the existing one */
512                free(dag_dev_name);
513                dag_dev_name = NULL;
514        }
515
516        /* Make sure we have successfully opened a DAG device */
517        if (dag_device == NULL) {
518                if (dag_dev_name)
519                        free(dag_dev_name);
520                dag_dev_name = NULL;
521                pthread_mutex_unlock(&open_dag_mutex);
522                return -1;
523        }
524
525        FORMAT_DATA->device = dag_device;
526
527        /* See Config_Status_API_Programming_Guide.pdf from the Endace
528           Dag Documentation */
529        /* Check kBooleanAttributeActive is true -- no point capturing
530         * on an interface that's disabled
531         *
532         * The symptom of the port being disabled is that libtrace
533         * will appear to hang. */
534        /* Check kBooleanAttributeFault is false */
535        /* Check kBooleanAttributeLocalFault is false */
536        /* Check kBooleanAttributeLock is true ? */
537        /* Check kBooleanAttributePeerLink ? */
538
539        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
540           on libtrace promisc attribute?*/
541        /* Set kUint32AttributeSnapLength to the snaplength */
542
543        pthread_mutex_unlock(&open_dag_mutex);
544        return 0;
545}
546
547#ifdef HAVE_DAG_CONFIG_API_H
548static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
549        dag_card_ref_t card_ref = NULL;
550        dag_component_t root = NULL;
551        attr_uuid_t uuid = 0;
552
553        if (slen < 0)
554                slen = 0; 
555
556        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
557        root = dag_config_get_root_component(card_ref);
558       
559        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
560        dag_config_set_boolean_attribute(card_ref, uuid, true);
561
562        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
563        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
564
565        return 0;
566       
567
568} 
569#endif /* HAVE_DAG_CONFIG_API_H */
570
571/* Configures a DAG input trace */
572static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
573                            void *data)
574{
575        switch(option) {
576        case TRACE_OPTION_META_FREQ:
577                /* This option is used to specify the frequency of DUCK
578                 * updates */
579                DUCK.duck_freq = *(int *)data;
580                return 0;
581        case TRACE_OPTION_SNAPLEN:
582#ifdef HAVE_DAG_CONFIG_API_H
583                return dag_csapi_set_snaplen(libtrace, *(int *)data);
584#else
585                /* Tell the card our new snap length */
586        {
587                char conf_str[4096];
588                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
589                if (dag_configure(FORMAT_DATA->device->fd,
590                                  conf_str) != 0) {
591                        trace_set_err(libtrace, errno, "Failed to configure "
592                                      "snaplen on DAG card: %s",
593                                      libtrace->uridata);
594                        return -1;
595                }
596        }
597#endif /* HAVE_DAG_CONFIG_API_H */
598
599                return 0;
600        case TRACE_OPTION_PROMISC:
601                /* DAG already operates in a promisc fashion */
602                return -1;
603        case TRACE_OPTION_FILTER:
604                /* We don't yet support pushing filters into DAG
605                 * cards */
606                return -1;
607        case TRACE_OPTION_EVENT_REALTIME:
608                /* Live capture is always going to be realtime */
609                return -1;
610        case TRACE_OPTION_HASHER:
611                /* Lets just say we did this, it's currently still up to
612                 * the user to configure this correctly. */
613                return 0;
614        }
615        return -1;
616}
617
618/* Starts a DAG output trace */
619static int dag_start_output(libtrace_out_t *libtrace)
620{
621        struct timeval zero, nopoll;
622
623        zero.tv_sec = 0;
624        zero.tv_usec = 0;
625        nopoll = zero;
626
627        /* Attach and start the DAG stream */
628        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
629                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
630                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
631                return -1;
632        }
633
634        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
635                        FORMAT_DATA_OUT->dagstream) < 0) {
636                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
637                return -1;
638        }
639        FORMAT_DATA_OUT->stream_attached = 1;
640
641        /* We don't want the dag card to do any sleeping */
642        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
643                        FORMAT_DATA_OUT->dagstream, 0, &zero,
644                        &nopoll);
645
646        return 0;
647}
648
649static int dag_start_input_stream(libtrace_t *libtrace,
650                                  struct dag_per_stream_t * stream) {
651        struct timeval zero, nopoll;
652        uint8_t *top, *bottom, *starttop;
653        top = bottom = NULL;
654
655        zero.tv_sec = 0;
656        zero.tv_usec = 10000;
657        nopoll = zero;
658
659        /* Attach and start the DAG stream */
660        if (dag_attach_stream(FORMAT_DATA->device->fd,
661                              stream->dagstream, 0, 0) < 0) {
662                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
663                              stream->dagstream);
664                return -1;
665        }
666
667        if (dag_start_stream(FORMAT_DATA->device->fd,
668                             stream->dagstream) < 0) {
669                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
670                              stream->dagstream);
671                return -1;
672        }
673        FORMAT_DATA->stream_attached = 1;
674
675        /* We don't want the dag card to do any sleeping */
676        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
677                            stream->dagstream, 0, &zero,
678                            &nopoll) < 0) {
679                trace_set_err(libtrace, errno,
680                              "dag_set_stream_poll failed!");
681                return -1;
682        }
683
684        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
685                                      stream->dagstream,
686                                      &bottom);
687
688        /* Should probably flush the memory hole now */
689        top = starttop;
690        while (starttop - bottom > 0) {
691                bottom += (starttop - bottom);
692                top = dag_advance_stream(FORMAT_DATA->device->fd,
693                                         stream->dagstream,
694                                         &bottom);
695        }
696        stream->top = top;
697        stream->bottom = bottom;
698        stream->processed = 0;
699        stream->drops = 0;
700
701        return 0;
702
703}
704
705/* Starts a DAG input trace */
706static int dag_start_input(libtrace_t *libtrace)
707{
708        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
709}
710
711static int dag_pstart_input(libtrace_t *libtrace)
712{
713        char *scan, *tok;
714        uint16_t stream_count = 0, max_streams;
715        int iserror = 0;
716        struct dag_per_stream_t stream_data;
717
718        /* Check we aren't trying to create more threads than the DAG card can
719         * handle */
720        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
721        if (libtrace->perpkt_thread_count > max_streams) {
722                fprintf(stderr,
723                              "WARNING: DAG has only %u streams available, "
724                              "capping total number of threads at this value.",
725                              max_streams);
726                libtrace->perpkt_thread_count = max_streams;
727        }
728
729        /* Get the stream names from the uri */
730        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
731                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
732                              "Format uri doesn't specify the DAG streams");
733                iserror = 1;
734                goto cleanup;
735        }
736
737        scan++;
738
739        tok = strtok(scan, ",");
740        while (tok != NULL) {
741                /* Ensure we haven't specified too many streams */
742                if (stream_count >= libtrace->perpkt_thread_count) {
743                        fprintf(stderr,
744                                      "WARNING: Format uri specifies too many "
745                                      "streams. Maximum is %u, so only using "
746                                      "the first %u from the uri.",
747                                      libtrace->perpkt_thread_count, 
748                                      libtrace->perpkt_thread_count);
749                        break;
750                }
751
752                /* Save the stream details */
753                if (stream_count == 0) {
754                        /* Special case where we update the existing stream
755                         * data structure */
756                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
757                } else {
758                        memset(&stream_data, 0, sizeof(stream_data));
759                        stream_data.dagstream = (uint16_t)atoi(tok);
760                        libtrace_list_push_back(FORMAT_DATA->per_stream,
761                                                &stream_data);
762                }
763
764                stream_count++;
765                tok = strtok(NULL, ",");
766        }
767
768        if (stream_count < libtrace->perpkt_thread_count) {
769                libtrace->perpkt_thread_count = stream_count;
770        }
771       
772        FORMAT_DATA->stream_attached = 1;
773
774 cleanup:
775        if (iserror) {
776                return -1;
777        } else {
778                return 0;
779        }
780}
781
782/* Pauses a DAG output trace */
783static int dag_pause_output(libtrace_out_t *libtrace)
784{
785        /* Stop and detach the stream */
786        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
787                            FORMAT_DATA_OUT->dagstream) < 0) {
788                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
789                return -1;
790        }
791        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
792                              FORMAT_DATA_OUT->dagstream) < 0) {
793                trace_set_err_out(libtrace, errno,
794                                  "Could not detach DAG stream");
795                return -1;
796        }
797        FORMAT_DATA_OUT->stream_attached = 0;
798        return 0;
799}
800
801/* Pauses a DAG input trace */
802static int dag_pause_input(libtrace_t *libtrace)
803{
804        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
805
806        /* Stop and detach each stream */
807        while (tmp != NULL) {
808                if (dag_stop_stream(FORMAT_DATA->device->fd,
809                                    STREAM_DATA(tmp)->dagstream) < 0) {
810                        trace_set_err(libtrace, errno,
811                                      "Could not stop DAG stream");
812                        return -1;
813                }
814                if (dag_detach_stream(FORMAT_DATA->device->fd,
815                                      STREAM_DATA(tmp)->dagstream) < 0) {
816                        trace_set_err(libtrace, errno,
817                                      "Could not detach DAG stream");
818                        return -1;
819                }
820
821                tmp = tmp->next;
822        }
823
824        FORMAT_DATA->stream_attached = 0;
825        return 0;
826}
827
828
829
830/* Closes a DAG input trace */
831static int dag_fin_input(libtrace_t *libtrace)
832{
833        /* Need the lock, since we're going to be handling the device list */
834        pthread_mutex_lock(&open_dag_mutex);
835
836        /* Detach the stream if we are not paused */
837        if (FORMAT_DATA->stream_attached)
838                dag_pause_input(libtrace);
839        FORMAT_DATA->device->ref_count--;
840
841        /* Close the DAG device if there are no more references to it */
842        if (FORMAT_DATA->device->ref_count == 0)
843                dag_close_device(FORMAT_DATA->device);
844
845        if (DUCK.dummy_duck)
846                trace_destroy_dead(DUCK.dummy_duck);
847
848        /* Clear the list */
849        libtrace_list_deinit(FORMAT_DATA->per_stream);
850        free(libtrace->format_data);
851        pthread_mutex_unlock(&open_dag_mutex);
852        return 0; /* success */
853}
854
855/* Closes a DAG output trace */
856static int dag_fin_output(libtrace_out_t *libtrace)
857{
858
859        /* Commit any outstanding traffic in the txbuffer */
860        if (FORMAT_DATA_OUT->waiting) {
861                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
862                                           FORMAT_DATA_OUT->dagstream,
863                                           FORMAT_DATA_OUT->waiting );
864        }
865
866        /* Wait until the buffer is nearly clear before exiting the program,
867         * as we will lose packets otherwise */
868        dag_tx_get_stream_space
869                (FORMAT_DATA_OUT->device->fd,
870                 FORMAT_DATA_OUT->dagstream,
871                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
872                                            FORMAT_DATA_OUT->dagstream) - 8);
873
874        /* Need the lock, since we're going to be handling the device list */
875        pthread_mutex_lock(&open_dag_mutex);
876
877        /* Detach the stream if we are not paused */
878        if (FORMAT_DATA_OUT->stream_attached)
879                dag_pause_output(libtrace);
880        FORMAT_DATA_OUT->device->ref_count --;
881
882        /* Close the DAG device if there are no more references to it */
883        if (FORMAT_DATA_OUT->device->ref_count == 0)
884                dag_close_device(FORMAT_DATA_OUT->device);
885        free(libtrace->format_data);
886        pthread_mutex_unlock(&open_dag_mutex);
887        return 0; /* success */
888}
889
890#ifdef DAGIOC_CARD_DUCK
891#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
892#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
893#else
894#ifdef DAGIOCDUCK
895#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
896#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
897#else
898#warning "DAG appears to be missing DUCK support"
899#endif
900#endif
901
902/* Extracts DUCK information from the DAG card and produces a DUCK packet */
903static int dag_get_duckinfo(libtrace_t *libtrace,
904                                libtrace_packet_t *packet) {
905
906        if (DUCK.duck_freq == 0)
907                return 0;
908
909#ifndef LIBTRACE_DUCK_IOCTL
910        trace_set_err(libtrace, errno, 
911                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
912        DUCK.duck_freq = 0;
913        return -1;
914#endif
915
916        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
917                return 0;
918
919        /* Allocate memory for the DUCK data */
920        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
921            !packet->buffer) {
922                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
923                packet->buf_control = TRACE_CTRL_PACKET;
924                if (!packet->buffer) {
925                        trace_set_err(libtrace, errno,
926                                      "Cannot allocate packet buffer");
927                        return -1;
928                }
929        }
930
931        /* DUCK doesn't have a format header */
932        packet->header = 0;
933        packet->payload = packet->buffer;
934
935        /* No need to check if we can get DUCK or not - we're modern
936         * enough so just grab the DUCK info */
937        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
938                   (duckinf_t *)packet->payload) < 0)) {
939                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
940                DUCK.duck_freq = 0;
941                return -1;
942        }
943
944        packet->type = LIBTRACE_DUCK_VERSION;
945
946        /* Set the packet's trace to point at a DUCK trace, so that the
947         * DUCK format functions will be called on the packet rather than the
948         * DAG ones */
949        if (!DUCK.dummy_duck)
950                DUCK.dummy_duck = trace_create_dead("duck:dummy");
951        packet->trace = DUCK.dummy_duck;
952        DUCK.last_duck = DUCK.last_pkt;
953        packet->error = sizeof(duckinf_t);
954        return sizeof(duckinf_t);
955}
956
957/* Determines the amount of data available to read from the DAG card */
958static int dag_available(libtrace_t *libtrace,
959                         struct dag_per_stream_t *stream_data)
960{
961        uint32_t diff = stream_data->top - stream_data->bottom;
962
963        /* If we've processed more than 4MB of data since we last called
964         * dag_advance_stream, then we should call it again to allow the
965         * space occupied by that 4MB to be released */
966        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
967                return diff;
968
969        /* Update the top and bottom pointers */
970        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
971                                              stream_data->dagstream,
972                                              &(stream_data->bottom));
973
974        if (stream_data->top == NULL) {
975                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
976                return -1;
977        }
978        stream_data->processed = 0;
979        diff = stream_data->top - stream_data->bottom;
980        return diff;
981}
982
983/* Returns a pointer to the start of the next complete ERF record */
984static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
985{
986        dag_record_t *erfptr = NULL;
987        uint16_t size;
988
989        erfptr = (dag_record_t *)stream_data->bottom;
990        if (!erfptr)
991                return NULL;
992
993        size = ntohs(erfptr->rlen);
994        assert( size >= dag_record_size );
995
996        /* Make certain we have the full packet available */
997        if (size > (stream_data->top - stream_data->bottom))
998                return NULL;
999
1000        stream_data->bottom += size;
1001        stream_data->processed += size;
1002        return erfptr;
1003}
1004
1005/* Converts a buffer containing a recently read DAG packet record into a
1006 * libtrace packet */
1007static int dag_prepare_packet_stream(libtrace_t *libtrace,
1008                                     struct dag_per_stream_t *stream_data,
1009                                     libtrace_packet_t *packet,
1010                                     void *buffer, libtrace_rt_types_t rt_type,
1011                                     uint32_t flags)
1012{
1013        dag_record_t *erfptr;
1014
1015        /* If the packet previously owned a buffer that is not the buffer
1016         * that contains the new packet data, we're going to need to free the
1017         * old one to avoid memory leaks */
1018        if (packet->buffer != buffer &&
1019            packet->buf_control == TRACE_CTRL_PACKET) {
1020                free(packet->buffer);
1021        }
1022
1023        /* Set the buffer owner appropriately */
1024        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1025                packet->buf_control = TRACE_CTRL_PACKET;
1026        } else
1027                packet->buf_control = TRACE_CTRL_EXTERNAL;
1028
1029        /* Update the packet pointers and type appropriately */
1030        erfptr = (dag_record_t *)buffer;
1031        packet->buffer = erfptr;
1032        packet->header = erfptr;
1033        packet->type = rt_type;
1034
1035        if (erfptr->flags.rxerror == 1) {
1036                /* rxerror means the payload is corrupt - drop the payload
1037                 * by tweaking rlen */
1038                packet->payload = NULL;
1039                erfptr->rlen = htons(erf_get_framing_length(packet));
1040        } else {
1041                packet->payload = (char*)packet->buffer
1042                        + erf_get_framing_length(packet);
1043        }
1044
1045        if (libtrace->format_data == NULL) {
1046                dag_init_format_data(libtrace);
1047        }
1048
1049        /* Update the dropped packets counter */
1050        /* No loss counter for DSM coloured records - have to use some
1051         * other API */
1052        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
1053                /* TODO */
1054        } else {
1055                /* Use the ERF loss counter */
1056                if (stream_data->seeninterface[erfptr->flags.iface]
1057                    == 0) {
1058                        stream_data->seeninterface[erfptr->flags.iface]
1059                                = 1;
1060                } else {
1061                        stream_data->drops += ntohs(erfptr->lctr);
1062                }
1063        }
1064
1065        return 0;
1066}
1067
1068static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1069                              void *buffer, libtrace_rt_types_t rt_type,
1070                              uint32_t flags)
1071{
1072        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1073                                       buffer, rt_type, flags);
1074}
1075
1076/*
1077 * dag_write_packet() at this stage attempts to improve tx performance
1078 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1079 * has been met. I observed approximately 270% performance increase
1080 * through this relatively naive tweak. No optimisation of buffer sizes
1081 * was attempted.
1082 */
1083
1084/* Pushes an ERF record onto the transmit stream */
1085static int dag_dump_packet(libtrace_out_t *libtrace,
1086                           dag_record_t *erfptr, unsigned int pad,
1087                           void *buffer)
1088{
1089        int size;
1090
1091        /*
1092         * If we've got 0 bytes waiting in the txqueue, assume that we
1093         * haven't requested any space yet, and request some, storing
1094         * the pointer at FORMAT_DATA_OUT->txbuffer.
1095         *
1096         * The amount to request is slightly magical at the moment - it's
1097         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1098         * the buffer and handle overruns.
1099         */
1100        if (FORMAT_DATA_OUT->waiting == 0) {
1101                FORMAT_DATA_OUT->txbuffer =
1102                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1103                                                FORMAT_DATA_OUT->dagstream,
1104                                                16908288);
1105        }
1106
1107        /*
1108         * Copy the header separately to the body, as we can't guarantee they
1109         * are in contiguous memory
1110         */
1111        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1112               (dag_record_size + pad));
1113        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1114
1115        /*
1116         * Copy our incoming packet into the outgoing buffer, and increment
1117         * our waiting count
1118         */
1119        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1120        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1121               size);
1122        FORMAT_DATA_OUT->waiting += size;
1123
1124        /*
1125         * If our output buffer has more than 16 Mebibytes in it, commit those
1126         * bytes and reset the waiting count to 0.
1127         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1128         * case there is still data in the buffer at program exit.
1129         */
1130        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1131                FORMAT_DATA_OUT->txbuffer =
1132                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1133                                                   FORMAT_DATA_OUT->dagstream,
1134                                                   FORMAT_DATA_OUT->waiting);
1135                FORMAT_DATA_OUT->waiting = 0;
1136        }
1137
1138        return size + pad + dag_record_size;
1139}
1140
1141/* Attempts to determine a suitable ERF type for a given packet. Returns true
1142 * if one is found, false otherwise */
1143static bool find_compatible_linktype(libtrace_out_t *libtrace,
1144                                     libtrace_packet_t *packet, char *type)
1145{
1146        /* Keep trying to simplify the packet until we can find
1147         * something we can do with it */
1148
1149        do {
1150                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1151
1152                /* Success */
1153                if (*type != (char)-1)
1154                        return true;
1155
1156                if (!demote_packet(packet)) {
1157                        trace_set_err_out(libtrace,
1158                                          TRACE_ERR_NO_CONVERSION,
1159                                          "No erf type for packet (%i)",
1160                                          trace_get_link_type(packet));
1161                        return false;
1162                }
1163
1164        } while(1);
1165
1166        return true;
1167}
1168
1169/* Writes a packet to the provided DAG output trace */
1170static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1171{
1172        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1173         * coding sucks, sorry about that.
1174         */
1175        unsigned int pad = 0;
1176        int numbytes;
1177        void *payload = packet->payload;
1178        dag_record_t *header = (dag_record_t *)packet->header;
1179        char erf_type = 0;
1180
1181        if(!packet->header) {
1182                /* No header, probably an RT packet. Lifted from
1183                 * erf_write_packet(). */
1184                return -1;
1185        }
1186
1187        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1188                return 0;
1189
1190        pad = dag_get_padding(packet);
1191
1192        /*
1193         * If the payload is null, adjust the rlen. Discussion of this is
1194         * attached to erf_write_packet()
1195         */
1196        if (payload == NULL) {
1197                header->rlen = htons(dag_record_size + pad);
1198        }
1199
1200        if (packet->type == TRACE_RT_DATA_ERF) {
1201                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1202        } else {
1203                /* Build up a new packet header from the existing header */
1204
1205                /* Simplify the packet first - if we can't do this, break
1206                 * early */
1207                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1208                        return -1;
1209
1210                dag_record_t erfhdr;
1211
1212                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1213                payload=packet->payload;
1214                pad = dag_get_padding(packet);
1215
1216                /* Flags. Can't do this */
1217                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1218                if (trace_get_direction(packet)!=(int)~0U)
1219                        erfhdr.flags.iface = trace_get_direction(packet);
1220
1221                erfhdr.type = erf_type;
1222
1223                /* Packet length (rlen includes format overhead) */
1224                assert(trace_get_capture_length(packet) > 0
1225                       && trace_get_capture_length(packet) <= 65536);
1226                assert(erf_get_framing_length(packet) > 0
1227                       && trace_get_framing_length(packet) <= 65536);
1228                assert(trace_get_capture_length(packet) +
1229                       erf_get_framing_length(packet) > 0
1230                       && trace_get_capture_length(packet) +
1231                       erf_get_framing_length(packet) <= 65536);
1232
1233                erfhdr.rlen = htons(trace_get_capture_length(packet)
1234                                    + erf_get_framing_length(packet));
1235
1236
1237                /* Loss counter. Can't do this */
1238                erfhdr.lctr = 0;
1239                /* Wire length, does not include padding! */
1240                erfhdr.wlen = htons(trace_get_wire_length(packet));
1241
1242                /* Write it out */
1243                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1244        }
1245
1246        return numbytes;
1247}
1248
1249/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1250 *
1251 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1252 */
1253static int dag_read_packet_stream(libtrace_t *libtrace,
1254                                struct dag_per_stream_t *stream_data,
1255                                libtrace_thread_t *t, /* Optional */
1256                                libtrace_packet_t *packet)
1257{
1258        int size = 0;
1259        dag_record_t *erfptr = NULL;
1260        struct timeval tv;
1261        int numbytes = 0;
1262        uint32_t flags = 0;
1263        struct timeval maxwait, pollwait;
1264
1265        pollwait.tv_sec = 0;
1266        pollwait.tv_usec = 10000;
1267        maxwait.tv_sec = 0;
1268        maxwait.tv_usec = 250000;
1269
1270        /* Check if we're due for a DUCK report - only report on the first thread */
1271        if (stream_data == FORMAT_DATA_FIRST) {
1272                size = dag_get_duckinfo(libtrace, packet);
1273                if (size != 0)
1274                        return size;
1275        }
1276
1277
1278        /* Don't let anyone try to free our DAG memory hole! */
1279        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1280
1281        /* If the packet buffer is currently owned by libtrace, free it so
1282         * that we can set the packet to point into the DAG memory hole */
1283        if (packet->buf_control == TRACE_CTRL_PACKET) {
1284                free(packet->buffer);
1285                packet->buffer = 0;
1286        }
1287
1288        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
1289                                sizeof(dag_record_t), &maxwait,
1290                                &pollwait) == -1) {
1291                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1292                return -1;
1293        }
1294
1295        /* Grab a full ERF record */
1296        do {
1297                numbytes = dag_available(libtrace, stream_data);
1298                if (numbytes < 0)
1299                        return numbytes;
1300                if (numbytes < dag_record_size) {
1301                        /* Check the message queue if we have one to check */
1302                        if (t != NULL &&
1303                            libtrace_message_queue_count(&t->messages) > 0)
1304                                return -2;
1305
1306                        if ((numbytes=is_halted(libtrace)) != -1)
1307                                return numbytes;
1308                        /* Block until we see a packet */
1309                        continue;
1310                }
1311                erfptr = dag_get_record(stream_data);
1312        } while (erfptr == NULL);
1313
1314        packet->trace = libtrace;
1315
1316        /* Prepare the libtrace packet */
1317        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1318                                    TRACE_RT_DATA_ERF, flags))
1319                return -1;
1320
1321        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1322        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1323                tv = trace_get_timeval(packet);
1324                DUCK.last_pkt = tv.tv_sec;
1325        }
1326
1327        packet->order = erf_get_erf_timestamp(packet);
1328        packet->error = packet->payload ? htons(erfptr->rlen) :
1329                                          erf_get_framing_length(packet);
1330
1331        return packet->error;
1332}
1333
1334static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1335{
1336        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1337}
1338
1339static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1340                             libtrace_packet_t **packets, size_t nb_packets)
1341{
1342        int ret;
1343        size_t read_packets = 0;
1344        int numbytes = 0;
1345
1346        struct dag_per_stream_t *stream_data =
1347                (struct dag_per_stream_t *)t->format_data;
1348
1349        /* Read as many packets as we can, but read atleast one packet */
1350        do {
1351                ret = dag_read_packet_stream(libtrace, stream_data, t,
1352                                           packets[read_packets]);
1353                if (ret < 0)
1354                        return ret;
1355
1356                read_packets++;
1357
1358                /* Make sure we don't read too many packets..! */
1359                if (read_packets >= nb_packets)
1360                        break;
1361
1362                numbytes = dag_available(libtrace, stream_data);
1363        } while (numbytes >= dag_record_size);
1364
1365        return read_packets;
1366}
1367
1368/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1369 * packet is available, we will return a packet event. Otherwise we will
1370 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1371 */
1372static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1373                                           libtrace_packet_t *packet)
1374{
1375        libtrace_eventobj_t event = {0,0,0.0,0};
1376        dag_record_t *erfptr = NULL;
1377        int numbytes;
1378        uint32_t flags = 0;
1379        struct timeval minwait, tv;
1380       
1381        minwait.tv_sec = 0;
1382        minwait.tv_usec = 10000;
1383
1384        /* Check if we're meant to provide a DUCK update */
1385        numbytes = dag_get_duckinfo(libtrace, packet);
1386        if (numbytes < 0) {
1387                event.type = TRACE_EVENT_TERMINATE;
1388                return event;
1389        } else if (numbytes > 0) {
1390                event.type = TRACE_EVENT_PACKET;
1391                return event;
1392        }
1393       
1394        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
1395                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1396                                &minwait) == -1) {
1397                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1398                event.type = TRACE_EVENT_TERMINATE;
1399                return event;
1400        }
1401
1402        do {
1403                erfptr = NULL;
1404                numbytes = 0;
1405
1406                /* Need to call dag_available so that the top pointer will get
1407                 * updated, otherwise we'll never see any data! */
1408                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1409
1410                /* May as well not bother calling dag_get_record if
1411                 * dag_available suggests that there's no data */
1412                if (numbytes != 0)
1413                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1414                if (erfptr == NULL) {
1415                        /* No packet available - sleep for a very short time */
1416                        if (libtrace_halt) {
1417                                event.type = TRACE_EVENT_TERMINATE;
1418                        } else {
1419                                event.type = TRACE_EVENT_SLEEP;
1420                                event.seconds = 0.0001;
1421                        }
1422                        break;
1423                }
1424                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1425                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1426                        event.type = TRACE_EVENT_TERMINATE;
1427                        break;
1428                }
1429
1430
1431                event.size = trace_get_capture_length(packet) +
1432                        trace_get_framing_length(packet);
1433
1434                /* XXX trace_read_packet() normally applies the following
1435                 * config options for us, but this function is called via
1436                 * trace_event() so we have to do it ourselves */
1437
1438                if (libtrace->filter) {
1439                        int filtret = trace_apply_filter(libtrace->filter,
1440                                                         packet);
1441                        if (filtret == -1) {
1442                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1443                                              "Bad BPF Filter");
1444                                event.type = TRACE_EVENT_TERMINATE;
1445                                break;
1446                        }
1447
1448                        if (filtret == 0) {
1449                                /* This packet isn't useful so we want to
1450                                 * immediately see if there is another suitable
1451                                 * one - we definitely DO NOT want to return
1452                                 * a sleep event in this case, like we used to
1453                                 * do! */
1454                                libtrace->filtered_packets ++;
1455                                trace_clear_cache(packet);
1456                                continue;
1457                        }
1458
1459                        event.type = TRACE_EVENT_PACKET;
1460                } else {
1461                        event.type = TRACE_EVENT_PACKET;
1462                }
1463
1464                /* Update the DUCK timer */
1465                tv = trace_get_timeval(packet);
1466                DUCK.last_pkt = tv.tv_sec;
1467               
1468                if (libtrace->snaplen > 0) {
1469                        trace_set_capture_length(packet, libtrace->snaplen);
1470                }
1471                libtrace->accepted_packets ++;
1472                break;
1473        } while(1);
1474
1475        return event;
1476}
1477
1478static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1479{
1480        libtrace_list_node_t *tmp;
1481        assert(stat && libtrace);
1482        tmp = FORMAT_DATA_HEAD;
1483
1484        /* Dropped packets */
1485        stat->dropped_valid = 1;
1486        stat->dropped = 0;
1487        while (tmp != NULL) {
1488                stat->dropped += STREAM_DATA(tmp)->drops;
1489                tmp = tmp->next;
1490        }
1491
1492}
1493
1494static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1495                                       libtrace_stat_t *stat) {
1496        struct dag_per_stream_t *stream_data = t->format_data;
1497        assert(stat && libtrace);
1498
1499        stat->dropped_valid = 1;
1500        stat->dropped = stream_data->drops;
1501
1502        stat->filtered_valid = 1;
1503        stat->filtered = 0;
1504}
1505
1506/* Prints some semi-useful help text about the DAG format module */
1507static void dag_help(void) {
1508        printf("dag format module: $Revision: 1755 $\n");
1509        printf("Supported input URIs:\n");
1510        printf("\tdag:/dev/dagn\n");
1511        printf("\n");
1512        printf("\te.g.: dag:/dev/dag0\n");
1513        printf("\n");
1514        printf("Supported output URIs:\n");
1515        printf("\tnone\n");
1516        printf("\n");
1517}
1518
1519static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1520                                bool reader)
1521{
1522        struct dag_per_stream_t *stream_data;
1523        libtrace_list_node_t *node;
1524
1525        if (reader && t->type == THREAD_PERPKT) {
1526                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1527                                                t->perpkt_num);
1528                if (node == NULL) {
1529                        return -1;
1530                }
1531                stream_data = node->data;
1532
1533                /* Pass the per thread data to the thread */
1534                t->format_data = stream_data;
1535
1536                /* Attach and start the DAG stream */
1537                if (dag_start_input_stream(libtrace, stream_data) < 0)
1538                        return -1;
1539        }
1540
1541        return 0;
1542}
1543
1544static struct libtrace_format_t dag = {
1545        "dag",
1546        "$Id$",
1547        TRACE_FORMAT_ERF,
1548        dag_probe_filename,             /* probe filename */
1549        NULL,                           /* probe magic */
1550        dag_init_input,                 /* init_input */
1551        dag_config_input,               /* config_input */
1552        dag_start_input,                /* start_input */
1553        dag_pause_input,                /* pause_input */
1554        dag_init_output,                /* init_output */
1555        NULL,                           /* config_output */
1556        dag_start_output,               /* start_output */
1557        dag_fin_input,                  /* fin_input */
1558        dag_fin_output,                 /* fin_output */
1559        dag_read_packet,                /* read_packet */
1560        dag_prepare_packet,             /* prepare_packet */
1561        NULL,                           /* fin_packet */
1562        dag_write_packet,               /* write_packet */
1563        erf_get_link_type,              /* get_link_type */
1564        erf_get_direction,              /* get_direction */
1565        erf_set_direction,              /* set_direction */
1566        erf_get_erf_timestamp,          /* get_erf_timestamp */
1567        NULL,                           /* get_timeval */
1568        NULL,                           /* get_seconds */
1569        NULL,                           /* get_timespec */
1570        NULL,                           /* seek_erf */
1571        NULL,                           /* seek_timeval */
1572        NULL,                           /* seek_seconds */
1573        erf_get_capture_length,         /* get_capture_length */
1574        erf_get_wire_length,            /* get_wire_length */
1575        erf_get_framing_length,         /* get_framing_length */
1576        erf_set_capture_length,         /* set_capture_length */
1577        NULL,                           /* get_received_packets */
1578        NULL,                           /* get_filtered_packets */
1579        NULL,                           /* get_dropped_packets */
1580        dag_get_statistics,             /* get_statistics */
1581        NULL,                           /* get_fd */
1582        trace_event_dag,                /* trace_event */
1583        dag_help,                       /* help */
1584        NULL,                            /* next pointer */
1585        {true, 0}, /* live packet capture, thread limit TBD */
1586        dag_pstart_input,
1587        dag_pread_packets,
1588        dag_pause_input,
1589        NULL,
1590        dag_pregister_thread,
1591        NULL,
1592        dag_get_thread_statistics       /* get thread stats */
1593};
1594
1595void dag_constructor(void)
1596{
1597        register_format(&dag);
1598}
Note: See TracBrowser for help on using the repository browser.