source: lib/format_dag25.c @ 32ee9b2

cachetimestampsdeveloprc-4.0.4ringdecrementfixringperformance
Last change on this file since 32ee9b2 was 32ee9b2, checked in by Shane Alcock <salcock@…>, 2 years ago

Add new trace_flush_output() to public API

Can be used to force a libtrace output to dump any buffered output
to disk immediately.

Note that if the file is compressed or the output trace format
requires a trailer, the flushed file will still not be properly
readable afterwards as this will not result in any trailers
being written. You'll still have to close the file for that.

Mainly this is useful for ensuring that output file sizes grow
over time in situations where the amount of output is relatively
small, rather than staying stuck at 0 bytes until we either reach
1MB of output or the file is closed. For instance, you could have
a timer that calls trace_flush_output() every 30 seconds so that
the output file size will grow if any packets were written in the
last 30 seconds.

  • Property mode set to 100644
File size: 45.4 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#define _GNU_SOURCE
27
28#include "config.h"
29#include "common.h"
30#include "libtrace.h"
31#include "libtrace_int.h"
32#include "format_helper.h"
33#include "format_erf.h"
34
35#include <assert.h>
36#include <errno.h>
37#include <fcntl.h>
38#include <stdio.h>
39#include <string.h>
40#include <stdlib.h>
41#include <sys/stat.h>
42
43#include <sys/mman.h>
44/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
45 * Windows anyway so we'll worry about this more later :] */
46#include <pthread.h>
47
48
49#ifdef HAVE_DAG_CONFIG_API_H
50#include <dag_config_api.h>
51#endif
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66/* This format deals with DAG cards that are using drivers from the 2.5 version
67 * onwards, including 3.X.
68 *
69 * DAG is a LIVE capture format.
70 *
71 * This format does support writing, provided the DAG card that you are using
72 * has transmit (Tx) support. Additionally, packets read using this format
73 * are in the ERF format, so can easily be written as ERF traces without
74 * losing any data.
75 */
76
77#define DATA(x) ((struct dag_format_data_t *)x->format_data)
78#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
79#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
80
81#define FORMAT_DATA DATA(libtrace)
82#define FORMAT_DATA_OUT DATA_OUT(libtrace)
83
84#define DUCK FORMAT_DATA->duck
85
86#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
87#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
88
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* The DAG device being used for writing */
107        struct dag_dev_t *device;
108        /* The DAG stream that is being written on */
109        unsigned int dagstream;
110        /* Boolean flag indicating whether the stream is currently attached */
111        int stream_attached;
112        /* The amount of data waiting to be transmitted, in bytes */
113        uint64_t waiting;
114        /* A buffer to hold the data to be transmittted */
115        uint8_t *txbuffer;
116};
117
118/* Data that is stored against each input stream */
119struct dag_per_stream_t {
120        /* DAG stream number */
121        uint16_t dagstream;
122        /* Pointer to the last unread byte in the DAG memory */
123        uint8_t *top;
124        /* Pointer to the first unread byte in the DAG memory */
125        uint8_t *bottom;
126        /* Amount of data processed from the bottom pointer */
127        uint32_t processed;
128        /* Number of packets seen by the stream */
129        uint64_t pkt_count;
130        /* Drop count for this particular stream */
131        uint64_t drops;
132        /* Boolean values to indicate if a particular interface has been seen
133         * or not. This is limited to four interfaces, which is enough to
134         * support all current DAG cards */
135        uint8_t seeninterface[4];
136};
137
138/* "Global" data that is stored for each DAG input trace */
139struct dag_format_data_t {
140        /* DAG device */
141        struct dag_dev_t *device;
142        /* Boolean flag indicating whether the trace is currently attached */
143        int stream_attached;
144        /* Data stored against each DAG input stream */
145        libtrace_list_t *per_stream;
146
147        /* Data required for regular DUCK reporting.
148         * We put this on a new cache line otherwise we have a lot of false
149         * sharing caused by updating the last_pkt.
150         * This should only ever be accessed by the first thread stream,
151         * that includes both read and write operations.
152         */
153        struct {
154                /* Timestamp of the last DUCK report */
155                uint32_t last_duck;
156                /* The number of seconds between each DUCK report */
157                uint32_t duck_freq;
158                /* Timestamp of the last packet read from the DAG card */
159                uint32_t last_pkt;
160                /* Dummy trace to ensure DUCK packets are dealt with using the
161                 * DUCK format functions */
162                libtrace_t *dummy_duck;
163        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
164};
165
166/* To be thread-safe, we're going to need a mutex for operating on the list
167 * of DAG devices */
168pthread_mutex_t open_dag_mutex;
169
170/* The list of DAG devices that have been opened by libtrace.
171 *
172 * We can only open each DAG device once, but we might want to read from
173 * multiple streams. Therefore, we need to maintain a list of devices that we
174 * have opened (with ref counts!) so that we don't try to open a device too
175 * many times or close a device that we're still using */
176struct dag_dev_t *open_dags = NULL;
177
178/* Returns the amount of padding between the ERF header and the start of the
179 * captured packet data */
180static int dag_get_padding(const libtrace_packet_t *packet)
181{
182        /* ERF Ethernet records have a 2 byte padding before the packet itself
183         * so that the IP header is aligned on a 32 bit boundary.
184         */
185        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
186                dag_record_t *erfptr = (dag_record_t *)packet->header;
187                switch(erfptr->type) {
188                        case TYPE_ETH:
189                        case TYPE_COLOR_ETH:
190                        case TYPE_DSM_COLOR_ETH:
191                        case TYPE_COLOR_HASH_ETH:
192                                return 2;
193                        default:                return 0;
194                }
195        }
196        else {
197                switch(trace_get_link_type(packet)) {
198                        case TRACE_TYPE_ETH:    return 2;
199                        default:                return 0;
200                }
201        }
202}
203
204/* Attempts to determine if the given filename refers to a DAG device */
205static int dag_probe_filename(const char *filename)
206{
207        struct stat statbuf;
208        /* Can we stat the file? */
209        if (stat(filename, &statbuf) != 0) {
210                return 0;
211        }
212        /* Is it a character device? */
213        if (!S_ISCHR(statbuf.st_mode)) {
214                return 0;
215        }
216        /* Yeah, it's probably us. */
217        return 1;
218}
219
220/* Initialises the DAG output data structure */
221static void dag_init_format_out_data(libtrace_out_t *libtrace)
222{
223        libtrace->format_data = (struct dag_format_data_out_t *)
224                malloc(sizeof(struct dag_format_data_out_t));
225        // no DUCK on output
226        FORMAT_DATA_OUT->stream_attached = 0;
227        FORMAT_DATA_OUT->device = NULL;
228        FORMAT_DATA_OUT->dagstream = 0;
229        FORMAT_DATA_OUT->waiting = 0;
230
231}
232
233/* Initialises the DAG input data structure */
234static void dag_init_format_data(libtrace_t *libtrace)
235{
236        struct dag_per_stream_t stream_data;
237
238        libtrace->format_data = (struct dag_format_data_t *)
239                malloc(sizeof(struct dag_format_data_t));
240        DUCK.last_duck = 0;
241        DUCK.duck_freq = 0;
242        DUCK.last_pkt = 0;
243        DUCK.dummy_duck = NULL;
244
245        FORMAT_DATA->per_stream =
246                libtrace_list_init(sizeof(stream_data));
247        assert(FORMAT_DATA->per_stream != NULL);
248
249        /* We'll start with just one instance of stream_data, and we'll
250         * add more later if we need them */
251        memset(&stream_data, 0, sizeof(stream_data));
252        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
253}
254
255/* Determines if there is already an entry for the given DAG device in the
256 * device list and increments the reference count for that device, if found.
257 *
258 * NOTE: This function assumes the open_dag_mutex is held by the caller */
259static struct dag_dev_t *dag_find_open_device(char *dev_name)
260{
261        struct dag_dev_t *dag_dev;
262
263        dag_dev = open_dags;
264
265        /* XXX: Not exactly zippy, but how often are we going to be dealing
266         * with multiple dag cards? */
267        while (dag_dev != NULL) {
268                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
269                        dag_dev->ref_count ++;
270                        return dag_dev;
271                }
272                dag_dev = dag_dev->next;
273        }
274        return NULL;
275}
276
277/* Closes a DAG device and removes it from the device list.
278 *
279 * Attempting to close a DAG device that has a non-zero reference count will
280 * cause an assertion failure!
281 *
282 * NOTE: This function assumes the open_dag_mutex is held by the caller */
283static void dag_close_device(struct dag_dev_t *dev)
284{
285        /* Need to remove from the device list */
286        assert(dev->ref_count == 0);
287
288        if (dev->prev == NULL) {
289                open_dags = dev->next;
290                if (dev->next)
291                        dev->next->prev = NULL;
292        } else {
293                dev->prev->next = dev->next;
294                if (dev->next)
295                        dev->next->prev = dev->prev;
296        }
297
298        dag_close(dev->fd);
299        free(dev);
300}
301
302
303/* Opens a new DAG device for writing and adds it to the DAG device list
304 *
305 * NOTE: this function should only be called when opening a DAG device for
306 * writing - there is little practical difference between this and the
307 * function below that covers the reading case, but we need the output trace
308 * object to report errors properly so the two functions take slightly
309 * different arguments. This is really lame and there should be a much better
310 * way of doing this.
311 *
312 * NOTE: This function assumes the open_dag_mutex is held by the caller
313 */
314static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
315                                                char *dev_name)
316{
317        struct stat buf;
318        int fd;
319        struct dag_dev_t *new_dev;
320
321        /* Make sure the device exists */
322        if (stat(dev_name, &buf) == -1) {
323                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
324                return NULL;
325        }
326
327        /* Make sure it is the appropriate type of device */
328        if (S_ISCHR(buf.st_mode)) {
329                /* Try opening the DAG device */
330                if((fd = dag_open(dev_name)) < 0) {
331                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
332                                        dev_name);
333                        return NULL;
334                }
335        } else {
336                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
337                                dev_name);
338                return NULL;
339        }
340
341        /* Add the device to our device list - it is just a doubly linked
342         * list with no inherent ordering; just tack the new one on the front
343         */
344        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
345        new_dev->fd = fd;
346        new_dev->dev_name = dev_name;
347        new_dev->ref_count = 1;
348
349        new_dev->prev = NULL;
350        new_dev->next = open_dags;
351        if (open_dags)
352                open_dags->prev = new_dev;
353
354        open_dags = new_dev;
355
356        return new_dev;
357}
358
359/* Opens a new DAG device for reading and adds it to the DAG device list
360 *
361 * NOTE: this function should only be called when opening a DAG device for
362 * reading - there is little practical difference between this and the
363 * function above that covers the writing case, but we need the input trace
364 * object to report errors properly so the two functions take slightly
365 * different arguments. This is really lame and there should be a much better
366 * way of doing this.
367 *
368 * NOTE: This function assumes the open_dag_mutex is held by the caller */
369static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
370        struct stat buf;
371        int fd;
372        struct dag_dev_t *new_dev;
373
374        /* Make sure the device exists */
375        if (stat(dev_name, &buf) == -1) {
376                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
377                return NULL;
378        }
379
380        /* Make sure it is the appropriate type of device */
381        if (S_ISCHR(buf.st_mode)) {
382                /* Try opening the DAG device */
383                if((fd = dag_open(dev_name)) < 0) {
384                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
385                                      dev_name);
386                        return NULL;
387                }
388        } else {
389                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
390                              dev_name);
391                return NULL;
392        }
393
394        /* Add the device to our device list - it is just a doubly linked
395         * list with no inherent ordering; just tack the new one on the front
396         */
397        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
398        new_dev->fd = fd;
399        new_dev->dev_name = dev_name;
400        new_dev->ref_count = 1;
401
402        new_dev->prev = NULL;
403        new_dev->next = open_dags;
404        if (open_dags)
405                open_dags->prev = new_dev;
406
407        open_dags = new_dev;
408
409        return new_dev;
410}
411
412/* Creates and initialises a DAG output trace */
413static int dag_init_output(libtrace_out_t *libtrace)
414{
415        /* Upon successful creation, the device name is stored against the
416         * device and free when it is free()d */
417        char *dag_dev_name = NULL;
418        char *scan = NULL;
419        struct dag_dev_t *dag_device = NULL;
420        int stream = 1;
421
422        /* XXX I don't know if this is important or not, but this function
423         * isn't present in all of the driver releases that this code is
424         * supposed to support! */
425        /*
426        unsigned long wake_time;
427        dagutil_sleep_get_wake_time(&wake_time,0);
428        */
429
430        dag_init_format_out_data(libtrace);
431        /* Grab the mutex while we're likely to be messing with the device
432         * list */
433        pthread_mutex_lock(&open_dag_mutex);
434
435        /* Specific streams are signified using a comma in the libtrace URI,
436         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
437         *
438         * If no stream is specified, we will write using stream 1 */
439        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
440                dag_dev_name = strdup(libtrace->uridata);
441        } else {
442                dag_dev_name = (char *)strndup(libtrace->uridata,
443                                (size_t)(scan - libtrace->uridata));
444                stream = atoi(++scan);
445        }
446        FORMAT_DATA_OUT->dagstream = stream;
447
448        /* See if our DAG device is already open */
449        dag_device = dag_find_open_device(dag_dev_name);
450
451        if (dag_device == NULL) {
452                /* Device not yet opened - open it ourselves */
453                dag_device = dag_open_output_device(libtrace, dag_dev_name);
454        } else {
455                /* Otherwise, just use the existing one */
456                free(dag_dev_name);
457                dag_dev_name = NULL;
458        }
459
460        /* Make sure we have successfully opened a DAG device */
461        if (dag_device == NULL) {
462                if (dag_dev_name) {
463                        free(dag_dev_name);
464                }
465                pthread_mutex_unlock(&open_dag_mutex);
466                return -1;
467        }
468
469        FORMAT_DATA_OUT->device = dag_device;
470        pthread_mutex_unlock(&open_dag_mutex);
471        return 0;
472}
473
474/* Creates and initialises a DAG input trace */
475static int dag_init_input(libtrace_t *libtrace) {
476        /* Upon successful creation, the device name is stored against the
477         * device and free when it is free()d */
478        char *dag_dev_name = NULL;
479        char *scan = NULL;
480        int stream = 0;
481        struct dag_dev_t *dag_device = NULL;
482
483        dag_init_format_data(libtrace);
484        /* Grab the mutex while we're likely to be messing with the device
485         * list */
486        pthread_mutex_lock(&open_dag_mutex);
487
488
489        /* DAG cards support multiple streams. In a single threaded capture,
490         * these are specified using a comma in the libtrace URI,
491         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
492         *
493         * If no stream is specified, we will read from stream 0 with
494         * one thread
495         */
496        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
497                dag_dev_name = strdup(libtrace->uridata);
498        } else {
499                dag_dev_name = (char *)strndup(libtrace->uridata,
500                                (size_t)(scan - libtrace->uridata));
501                stream = atoi(++scan);
502        }
503
504        FORMAT_DATA_FIRST->dagstream = stream;
505
506        /* See if our DAG device is already open */
507        dag_device = dag_find_open_device(dag_dev_name);
508
509        if (dag_device == NULL) {
510                /* Device not yet opened - open it ourselves */
511                dag_device = dag_open_device(libtrace, dag_dev_name);
512        } else {
513                /* Otherwise, just use the existing one */
514                free(dag_dev_name);
515                dag_dev_name = NULL;
516        }
517
518        /* Make sure we have successfully opened a DAG device */
519        if (dag_device == NULL) {
520                if (dag_dev_name)
521                        free(dag_dev_name);
522                dag_dev_name = NULL;
523                pthread_mutex_unlock(&open_dag_mutex);
524                return -1;
525        }
526
527        FORMAT_DATA->device = dag_device;
528
529        /* See Config_Status_API_Programming_Guide.pdf from the Endace
530           Dag Documentation */
531        /* Check kBooleanAttributeActive is true -- no point capturing
532         * on an interface that's disabled
533         *
534         * The symptom of the port being disabled is that libtrace
535         * will appear to hang. */
536        /* Check kBooleanAttributeFault is false */
537        /* Check kBooleanAttributeLocalFault is false */
538        /* Check kBooleanAttributeLock is true ? */
539        /* Check kBooleanAttributePeerLink ? */
540
541        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
542           on libtrace promisc attribute?*/
543        /* Set kUint32AttributeSnapLength to the snaplength */
544
545        pthread_mutex_unlock(&open_dag_mutex);
546        return 0;
547}
548
549#ifdef HAVE_DAG_CONFIG_API_H
550static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
551        dag_card_ref_t card_ref = NULL;
552        dag_component_t root = NULL;
553        attr_uuid_t uuid = 0;
554
555        if (slen < 0)
556                slen = 0; 
557
558        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
559        root = dag_config_get_root_component(card_ref);
560       
561        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
562        dag_config_set_boolean_attribute(card_ref, uuid, true);
563
564        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
565        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
566
567        return 0;
568       
569
570} 
571#endif /* HAVE_DAG_CONFIG_API_H */
572
573/* Configures a DAG input trace */
574static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
575                            void *data)
576{
577        switch(option) {
578        case TRACE_OPTION_META_FREQ:
579                /* This option is used to specify the frequency of DUCK
580                 * updates */
581                DUCK.duck_freq = *(int *)data;
582                return 0;
583        case TRACE_OPTION_SNAPLEN:
584#ifdef HAVE_DAG_CONFIG_API_H
585                return dag_csapi_set_snaplen(libtrace, *(int *)data);
586#else
587                /* Tell the card our new snap length */
588        {
589                char conf_str[4096];
590                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
591                if (dag_configure(FORMAT_DATA->device->fd,
592                                  conf_str) != 0) {
593                        trace_set_err(libtrace, errno, "Failed to configure "
594                                      "snaplen on DAG card: %s",
595                                      libtrace->uridata);
596                        return -1;
597                }
598        }
599#endif /* HAVE_DAG_CONFIG_API_H */
600
601                return 0;
602        case TRACE_OPTION_PROMISC:
603                /* DAG already operates in a promisc fashion */
604                return -1;
605        case TRACE_OPTION_FILTER:
606                /* We don't yet support pushing filters into DAG
607                 * cards */
608                return -1;
609        case TRACE_OPTION_EVENT_REALTIME:
610        case TRACE_OPTION_REPLAY_SPEEDUP:
611                /* Live capture is always going to be realtime */
612                return -1;
613        case TRACE_OPTION_HASHER:
614                /* Lets just say we did this, it's currently still up to
615                 * the user to configure this correctly. */
616                return 0;
617        }
618        return -1;
619}
620
621/* Starts a DAG output trace */
622static int dag_start_output(libtrace_out_t *libtrace)
623{
624        struct timeval zero, nopoll;
625
626        zero.tv_sec = 0;
627        zero.tv_usec = 0;
628        nopoll = zero;
629
630        /* Attach and start the DAG stream */
631        if (dag_attach_stream64(FORMAT_DATA_OUT->device->fd,
632                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
633                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
634                return -1;
635        }
636
637        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
638                        FORMAT_DATA_OUT->dagstream) < 0) {
639                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
640                return -1;
641        }
642        FORMAT_DATA_OUT->stream_attached = 1;
643
644        /* We don't want the dag card to do any sleeping */
645        dag_set_stream_poll64(FORMAT_DATA_OUT->device->fd,
646                        FORMAT_DATA_OUT->dagstream, 0, &zero,
647                        &nopoll);
648
649        return 0;
650}
651
652static int dag_start_input_stream(libtrace_t *libtrace,
653                                  struct dag_per_stream_t * stream) {
654        struct timeval zero, nopoll;
655        uint8_t *top, *bottom, *starttop;
656        top = bottom = NULL;
657
658        zero.tv_sec = 0;
659        zero.tv_usec = 10000;
660        nopoll = zero;
661
662        /* Attach and start the DAG stream */
663        if (dag_attach_stream64(FORMAT_DATA->device->fd,
664                              stream->dagstream, 0, 0) < 0) {
665                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
666                              stream->dagstream);
667                return -1;
668        }
669
670        if (dag_start_stream(FORMAT_DATA->device->fd,
671                             stream->dagstream) < 0) {
672                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
673                              stream->dagstream);
674                return -1;
675        }
676        FORMAT_DATA->stream_attached = 1;
677
678        /* We don't want the dag card to do any sleeping */
679        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
680                            stream->dagstream, 0, &zero,
681                            &nopoll) < 0) {
682                trace_set_err(libtrace, errno,
683                              "dag_set_stream_poll failed!");
684                return -1;
685        }
686
687        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
688                                      stream->dagstream,
689                                      &bottom);
690
691        /* Should probably flush the memory hole now */
692        top = starttop;
693        while (starttop - bottom > 0) {
694                bottom += (starttop - bottom);
695                top = dag_advance_stream(FORMAT_DATA->device->fd,
696                                         stream->dagstream,
697                                         &bottom);
698        }
699        stream->top = top;
700        stream->bottom = bottom;
701        stream->processed = 0;
702        stream->drops = 0;
703
704        return 0;
705
706}
707
708/* Starts a DAG input trace */
709static int dag_start_input(libtrace_t *libtrace)
710{
711        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
712}
713
714static int dag_pstart_input(libtrace_t *libtrace)
715{
716        char *scan, *tok;
717        uint16_t stream_count = 0, max_streams;
718        int iserror = 0;
719        struct dag_per_stream_t stream_data;
720
721        /* Check we aren't trying to create more threads than the DAG card can
722         * handle */
723        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
724        if (libtrace->perpkt_thread_count > max_streams) {
725                fprintf(stderr,
726                              "WARNING: DAG has only %u streams available, "
727                              "capping total number of threads at this value.",
728                              max_streams);
729                libtrace->perpkt_thread_count = max_streams;
730        }
731
732        /* Get the stream names from the uri */
733        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
734                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
735                              "Format uri doesn't specify the DAG streams");
736                iserror = 1;
737                goto cleanup;
738        }
739
740        scan++;
741
742        tok = strtok(scan, ",");
743        while (tok != NULL) {
744                /* Ensure we haven't specified too many streams */
745                if (stream_count >= libtrace->perpkt_thread_count) {
746                        fprintf(stderr,
747                                      "WARNING: Format uri specifies too many "
748                                      "streams. Maximum is %u, so only using "
749                                      "the first %u from the uri.",
750                                      libtrace->perpkt_thread_count, 
751                                      libtrace->perpkt_thread_count);
752                        break;
753                }
754
755                /* Save the stream details */
756                if (stream_count == 0) {
757                        /* Special case where we update the existing stream
758                         * data structure */
759                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
760                } else {
761                        memset(&stream_data, 0, sizeof(stream_data));
762                        stream_data.dagstream = (uint16_t)atoi(tok);
763                        libtrace_list_push_back(FORMAT_DATA->per_stream,
764                                                &stream_data);
765                }
766
767                stream_count++;
768                tok = strtok(NULL, ",");
769        }
770
771        if (stream_count < libtrace->perpkt_thread_count) {
772                libtrace->perpkt_thread_count = stream_count;
773        }
774       
775        FORMAT_DATA->stream_attached = 1;
776
777 cleanup:
778        if (iserror) {
779                return -1;
780        } else {
781                return 0;
782        }
783}
784
785/* Pauses a DAG output trace */
786static int dag_pause_output(libtrace_out_t *libtrace)
787{
788        /* Stop and detach the stream */
789        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
790                            FORMAT_DATA_OUT->dagstream) < 0) {
791                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
792                return -1;
793        }
794        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
795                              FORMAT_DATA_OUT->dagstream) < 0) {
796                trace_set_err_out(libtrace, errno,
797                                  "Could not detach DAG stream");
798                return -1;
799        }
800        FORMAT_DATA_OUT->stream_attached = 0;
801        return 0;
802}
803
804/* Pauses a DAG input trace */
805static int dag_pause_input(libtrace_t *libtrace)
806{
807        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
808
809        /* Stop and detach each stream */
810        while (tmp != NULL) {
811                if (dag_stop_stream(FORMAT_DATA->device->fd,
812                                    STREAM_DATA(tmp)->dagstream) < 0) {
813                        trace_set_err(libtrace, errno,
814                                      "Could not stop DAG stream");
815                        return -1;
816                }
817                if (dag_detach_stream(FORMAT_DATA->device->fd,
818                                      STREAM_DATA(tmp)->dagstream) < 0) {
819                        trace_set_err(libtrace, errno,
820                                      "Could not detach DAG stream");
821                        return -1;
822                }
823
824                tmp = tmp->next;
825        }
826
827        FORMAT_DATA->stream_attached = 0;
828        return 0;
829}
830
831
832
833/* Closes a DAG input trace */
834static int dag_fin_input(libtrace_t *libtrace)
835{
836        /* Need the lock, since we're going to be handling the device list */
837        pthread_mutex_lock(&open_dag_mutex);
838
839        /* Detach the stream if we are not paused */
840        if (FORMAT_DATA->stream_attached)
841                dag_pause_input(libtrace);
842        FORMAT_DATA->device->ref_count--;
843
844        /* Close the DAG device if there are no more references to it */
845        if (FORMAT_DATA->device->ref_count == 0)
846                dag_close_device(FORMAT_DATA->device);
847
848        if (DUCK.dummy_duck)
849                trace_destroy_dead(DUCK.dummy_duck);
850
851        /* Clear the list */
852        libtrace_list_deinit(FORMAT_DATA->per_stream);
853        free(libtrace->format_data);
854        pthread_mutex_unlock(&open_dag_mutex);
855        return 0; /* success */
856}
857
858/* Closes a DAG output trace */
859static int dag_fin_output(libtrace_out_t *libtrace)
860{
861
862        /* Commit any outstanding traffic in the txbuffer */
863        if (FORMAT_DATA_OUT->waiting) {
864                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
865                                           FORMAT_DATA_OUT->dagstream,
866                                           FORMAT_DATA_OUT->waiting );
867        }
868
869        /* Wait until the buffer is nearly clear before exiting the program,
870         * as we will lose packets otherwise */
871        dag_tx_get_stream_space64
872                (FORMAT_DATA_OUT->device->fd,
873                 FORMAT_DATA_OUT->dagstream,
874                 dag_get_stream_buffer_size64(FORMAT_DATA_OUT->device->fd,
875                                            FORMAT_DATA_OUT->dagstream) - 8);
876
877        /* Need the lock, since we're going to be handling the device list */
878        pthread_mutex_lock(&open_dag_mutex);
879
880        /* Detach the stream if we are not paused */
881        if (FORMAT_DATA_OUT->stream_attached)
882                dag_pause_output(libtrace);
883        FORMAT_DATA_OUT->device->ref_count --;
884
885        /* Close the DAG device if there are no more references to it */
886        if (FORMAT_DATA_OUT->device->ref_count == 0)
887                dag_close_device(FORMAT_DATA_OUT->device);
888        free(libtrace->format_data);
889        pthread_mutex_unlock(&open_dag_mutex);
890        return 0; /* success */
891}
892
893#ifdef DAGIOC_CARD_DUCK
894#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
895#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
896#else
897#ifdef DAGIOCDUCK
898#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
899#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
900#else
901#warning "DAG appears to be missing DUCK support"
902#endif
903#endif
904
905/* Extracts DUCK information from the DAG card and produces a DUCK packet */
906static int dag_get_duckinfo(libtrace_t *libtrace,
907                                libtrace_packet_t *packet) {
908
909        if (DUCK.duck_freq == 0)
910                return 0;
911
912#ifndef LIBTRACE_DUCK_IOCTL
913        trace_set_err(libtrace, errno, 
914                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
915        DUCK.duck_freq = 0;
916        return -1;
917#endif
918
919        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
920                return 0;
921
922        /* Allocate memory for the DUCK data */
923        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
924            !packet->buffer) {
925                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
926                packet->buf_control = TRACE_CTRL_PACKET;
927                if (!packet->buffer) {
928                        trace_set_err(libtrace, errno,
929                                      "Cannot allocate packet buffer");
930                        return -1;
931                }
932        }
933
934        /* DUCK doesn't have a format header */
935        packet->header = 0;
936        packet->payload = packet->buffer;
937
938        /* No need to check if we can get DUCK or not - we're modern
939         * enough so just grab the DUCK info */
940        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
941                   (duckinf_t *)packet->payload) < 0)) {
942                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
943                DUCK.duck_freq = 0;
944                return -1;
945        }
946
947        packet->type = LIBTRACE_DUCK_VERSION;
948
949        /* Set the packet's trace to point at a DUCK trace, so that the
950         * DUCK format functions will be called on the packet rather than the
951         * DAG ones */
952        if (!DUCK.dummy_duck)
953                DUCK.dummy_duck = trace_create_dead("duck:dummy");
954        packet->trace = DUCK.dummy_duck;
955        DUCK.last_duck = DUCK.last_pkt;
956        packet->error = sizeof(duckinf_t);
957        return sizeof(duckinf_t);
958}
959
960/* Determines the amount of data available to read from the DAG card */
961static int dag_available(libtrace_t *libtrace,
962                         struct dag_per_stream_t *stream_data)
963{
964        uint32_t diff = stream_data->top - stream_data->bottom;
965
966        /* If we've processed more than 4MB of data since we last called
967         * dag_advance_stream, then we should call it again to allow the
968         * space occupied by that 4MB to be released */
969        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
970                return diff;
971
972        /* Update the top and bottom pointers */
973        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
974                                              stream_data->dagstream,
975                                              &(stream_data->bottom));
976
977        if (stream_data->top == NULL) {
978                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
979                return -1;
980        }
981        stream_data->processed = 0;
982        diff = stream_data->top - stream_data->bottom;
983        return diff;
984}
985
986/* Returns a pointer to the start of the next complete ERF record */
987static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
988{
989        dag_record_t *erfptr = NULL;
990        uint16_t size;
991
992        erfptr = (dag_record_t *)stream_data->bottom;
993        if (!erfptr)
994                return NULL;
995
996        size = ntohs(erfptr->rlen);
997        assert( size >= dag_record_size );
998
999        /* Make certain we have the full packet available */
1000        if (size > (stream_data->top - stream_data->bottom))
1001                return NULL;
1002
1003        stream_data->bottom += size;
1004        stream_data->processed += size;
1005        return erfptr;
1006}
1007
1008/* Converts a buffer containing a recently read DAG packet record into a
1009 * libtrace packet */
1010static int dag_prepare_packet_stream(libtrace_t *libtrace,
1011                                     struct dag_per_stream_t *stream_data,
1012                                     libtrace_packet_t *packet,
1013                                     void *buffer, libtrace_rt_types_t rt_type,
1014                                     uint32_t flags)
1015{
1016        dag_record_t *erfptr;
1017
1018        /* If the packet previously owned a buffer that is not the buffer
1019         * that contains the new packet data, we're going to need to free the
1020         * old one to avoid memory leaks */
1021        if (packet->buffer != buffer &&
1022            packet->buf_control == TRACE_CTRL_PACKET) {
1023                free(packet->buffer);
1024        }
1025
1026        /* Set the buffer owner appropriately */
1027        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1028                packet->buf_control = TRACE_CTRL_PACKET;
1029        } else
1030                packet->buf_control = TRACE_CTRL_EXTERNAL;
1031
1032        /* Update the packet pointers and type appropriately */
1033        erfptr = (dag_record_t *)buffer;
1034        packet->buffer = erfptr;
1035        packet->header = erfptr;
1036        packet->type = rt_type;
1037
1038        if (erfptr->flags.rxerror == 1) {
1039                /* rxerror means the payload is corrupt - drop the payload
1040                 * by tweaking rlen */
1041                packet->payload = NULL;
1042                erfptr->rlen = htons(erf_get_framing_length(packet));
1043        } else {
1044                packet->payload = (char*)packet->buffer
1045                        + erf_get_framing_length(packet);
1046        }
1047
1048        if (libtrace->format_data == NULL) {
1049                dag_init_format_data(libtrace);
1050        }
1051
1052        /* Update the dropped packets counter */
1053        /* No loss counter for DSM coloured records - have to use some
1054         * other API */
1055        if (erf_is_color_type(erfptr->type)) {
1056                /* TODO */
1057        } else {
1058                /* Use the ERF loss counter */
1059                if (stream_data->seeninterface[erfptr->flags.iface]
1060                    == 0) {
1061                        stream_data->seeninterface[erfptr->flags.iface]
1062                                = 1;
1063                } else {
1064                        stream_data->drops += ntohs(erfptr->lctr);
1065                }
1066        }
1067
1068        return 0;
1069}
1070
1071static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1072                              void *buffer, libtrace_rt_types_t rt_type,
1073                              uint32_t flags)
1074{
1075        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1076                                       buffer, rt_type, flags);
1077}
1078
1079/*
1080 * dag_write_packet() at this stage attempts to improve tx performance
1081 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1082 * has been met. I observed approximately 270% performance increase
1083 * through this relatively naive tweak. No optimisation of buffer sizes
1084 * was attempted.
1085 */
1086
1087/* Pushes an ERF record onto the transmit stream */
1088static int dag_dump_packet(libtrace_out_t *libtrace,
1089                           dag_record_t *erfptr, unsigned int pad,
1090                           void *buffer)
1091{
1092        int size;
1093
1094        /*
1095         * If we've got 0 bytes waiting in the txqueue, assume that we
1096         * haven't requested any space yet, and request some, storing
1097         * the pointer at FORMAT_DATA_OUT->txbuffer.
1098         *
1099         * The amount to request is slightly magical at the moment - it's
1100         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1101         * the buffer and handle overruns.
1102         */
1103        if (FORMAT_DATA_OUT->waiting == 0) {
1104                FORMAT_DATA_OUT->txbuffer =
1105                        dag_tx_get_stream_space64(FORMAT_DATA_OUT->device->fd,
1106                                                FORMAT_DATA_OUT->dagstream,
1107                                                16908288);
1108        }
1109
1110        /*
1111         * Copy the header separately to the body, as we can't guarantee they
1112         * are in contiguous memory
1113         */
1114        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1115               (dag_record_size + pad));
1116        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1117
1118        /*
1119         * Copy our incoming packet into the outgoing buffer, and increment
1120         * our waiting count
1121         */
1122        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1123        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1124               size);
1125        FORMAT_DATA_OUT->waiting += size;
1126
1127        /*
1128         * If our output buffer has more than 16 Mebibytes in it, commit those
1129         * bytes and reset the waiting count to 0.
1130         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1131         * case there is still data in the buffer at program exit.
1132         */
1133        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1134                FORMAT_DATA_OUT->txbuffer =
1135                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1136                                                   FORMAT_DATA_OUT->dagstream,
1137                                                   FORMAT_DATA_OUT->waiting);
1138                FORMAT_DATA_OUT->waiting = 0;
1139        }
1140
1141        return size + pad + dag_record_size;
1142}
1143
1144/* Attempts to determine a suitable ERF type for a given packet. Returns true
1145 * if one is found, false otherwise */
1146static bool find_compatible_linktype(libtrace_out_t *libtrace,
1147                                     libtrace_packet_t *packet, char *type)
1148{
1149        /* Keep trying to simplify the packet until we can find
1150         * something we can do with it */
1151
1152        do {
1153                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1154
1155                /* Success */
1156                if (*type != (char)-1)
1157                        return true;
1158
1159                if (!demote_packet(packet)) {
1160                        trace_set_err_out(libtrace,
1161                                          TRACE_ERR_NO_CONVERSION,
1162                                          "No erf type for packet (%i)",
1163                                          trace_get_link_type(packet));
1164                        return false;
1165                }
1166
1167        } while(1);
1168
1169        return true;
1170}
1171
1172/* Writes a packet to the provided DAG output trace */
1173static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1174{
1175        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1176         * coding sucks, sorry about that.
1177         */
1178        unsigned int pad = 0;
1179        int numbytes;
1180        void *payload = packet->payload;
1181        dag_record_t *header = (dag_record_t *)packet->header;
1182        char erf_type = 0;
1183
1184        if(!packet->header) {
1185                /* No header, probably an RT packet. Lifted from
1186                 * erf_write_packet(). */
1187                return -1;
1188        }
1189
1190        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1191                return 0;
1192
1193        pad = dag_get_padding(packet);
1194
1195        /*
1196         * If the payload is null, adjust the rlen. Discussion of this is
1197         * attached to erf_write_packet()
1198         */
1199        if (payload == NULL) {
1200                header->rlen = htons(dag_record_size + pad);
1201        }
1202
1203        if (packet->type == TRACE_RT_DATA_ERF) {
1204                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1205        } else {
1206                /* Build up a new packet header from the existing header */
1207
1208                /* Simplify the packet first - if we can't do this, break
1209                 * early */
1210                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1211                        return -1;
1212
1213                dag_record_t erfhdr;
1214
1215                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1216                payload=packet->payload;
1217                pad = dag_get_padding(packet);
1218
1219                /* Flags. Can't do this */
1220                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1221                if (trace_get_direction(packet)!=(int)~0U)
1222                        erfhdr.flags.iface = trace_get_direction(packet);
1223
1224                erfhdr.type = erf_type;
1225
1226                /* Packet length (rlen includes format overhead) */
1227                assert(trace_get_capture_length(packet) > 0
1228                       && trace_get_capture_length(packet) <= 65536);
1229                assert(erf_get_framing_length(packet) > 0
1230                       && trace_get_framing_length(packet) <= 65536);
1231                assert(trace_get_capture_length(packet) +
1232                       erf_get_framing_length(packet) > 0
1233                       && trace_get_capture_length(packet) +
1234                       erf_get_framing_length(packet) <= 65536);
1235
1236                erfhdr.rlen = htons(trace_get_capture_length(packet)
1237                                    + erf_get_framing_length(packet));
1238
1239
1240                /* Loss counter. Can't do this */
1241                erfhdr.lctr = 0;
1242                /* Wire length, does not include padding! */
1243                erfhdr.wlen = htons(trace_get_wire_length(packet));
1244
1245                /* Write it out */
1246                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1247        }
1248
1249        return numbytes;
1250}
1251
1252/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1253 *
1254 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1255 */
1256static int dag_read_packet_stream(libtrace_t *libtrace,
1257                                struct dag_per_stream_t *stream_data,
1258                                libtrace_thread_t *t, /* Optional */
1259                                libtrace_packet_t *packet)
1260{
1261        int size = 0;
1262        dag_record_t *erfptr = NULL;
1263        struct timeval tv;
1264        int numbytes = 0;
1265        uint32_t flags = 0;
1266        struct timeval maxwait, pollwait;
1267
1268        pollwait.tv_sec = 0;
1269        pollwait.tv_usec = 10000;
1270        maxwait.tv_sec = 0;
1271        maxwait.tv_usec = 250000;
1272
1273        /* Check if we're due for a DUCK report - only report on the first thread */
1274        if (stream_data == FORMAT_DATA_FIRST) {
1275                size = dag_get_duckinfo(libtrace, packet);
1276                if (size != 0)
1277                        return size;
1278        }
1279
1280
1281        /* Don't let anyone try to free our DAG memory hole! */
1282        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1283
1284        /* If the packet buffer is currently owned by libtrace, free it so
1285         * that we can set the packet to point into the DAG memory hole */
1286        if (packet->buf_control == TRACE_CTRL_PACKET) {
1287                free(packet->buffer);
1288                packet->buffer = 0;
1289        }
1290
1291        if (dag_set_stream_poll64(FORMAT_DATA->device->fd, stream_data->dagstream,
1292                                sizeof(dag_record_t), &maxwait,
1293                                &pollwait) == -1) {
1294                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1295                return -1;
1296        }
1297
1298        /* Grab a full ERF record */
1299        do {
1300                numbytes = dag_available(libtrace, stream_data);
1301                if (numbytes < 0)
1302                        return numbytes;
1303                if (numbytes < dag_record_size) {
1304                        /* Check the message queue if we have one to check */
1305                        if (t != NULL &&
1306                            libtrace_message_queue_count(&t->messages) > 0)
1307                                return -2;
1308
1309                        if ((numbytes=is_halted(libtrace)) != -1)
1310                                return numbytes;
1311                        /* Block until we see a packet */
1312                        continue;
1313                }
1314                erfptr = dag_get_record(stream_data);
1315        } while (erfptr == NULL);
1316
1317        packet->trace = libtrace;
1318
1319        /* Prepare the libtrace packet */
1320        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1321                                    TRACE_RT_DATA_ERF, flags))
1322                return -1;
1323
1324        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1325        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1326                tv = trace_get_timeval(packet);
1327                DUCK.last_pkt = tv.tv_sec;
1328        }
1329
1330        packet->order = erf_get_erf_timestamp(packet);
1331        packet->error = packet->payload ? htons(erfptr->rlen) :
1332                                          erf_get_framing_length(packet);
1333
1334        return packet->error;
1335}
1336
1337static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1338{
1339        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1340}
1341
1342static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1343                             libtrace_packet_t **packets, size_t nb_packets)
1344{
1345        int ret;
1346        size_t read_packets = 0;
1347        int numbytes = 0;
1348
1349        struct dag_per_stream_t *stream_data =
1350                (struct dag_per_stream_t *)t->format_data;
1351
1352        /* Read as many packets as we can, but read atleast one packet */
1353        do {
1354                ret = dag_read_packet_stream(libtrace, stream_data, t,
1355                                           packets[read_packets]);
1356                if (ret < 0)
1357                        return ret;
1358
1359                read_packets++;
1360
1361                /* Make sure we don't read too many packets..! */
1362                if (read_packets >= nb_packets)
1363                        break;
1364
1365                numbytes = dag_available(libtrace, stream_data);
1366        } while (numbytes >= dag_record_size);
1367
1368        return read_packets;
1369}
1370
1371/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1372 * packet is available, we will return a packet event. Otherwise we will
1373 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1374 */
1375static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1376                                           libtrace_packet_t *packet)
1377{
1378        libtrace_eventobj_t event = {0,0,0.0,0};
1379        dag_record_t *erfptr = NULL;
1380        int numbytes;
1381        uint32_t flags = 0;
1382        struct timeval minwait, tv;
1383       
1384        minwait.tv_sec = 0;
1385        minwait.tv_usec = 10000;
1386
1387        /* Check if we're meant to provide a DUCK update */
1388        numbytes = dag_get_duckinfo(libtrace, packet);
1389        if (numbytes < 0) {
1390                event.type = TRACE_EVENT_TERMINATE;
1391                return event;
1392        } else if (numbytes > 0) {
1393                event.type = TRACE_EVENT_PACKET;
1394                return event;
1395        }
1396       
1397        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
1398                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1399                                &minwait) == -1) {
1400                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1401                event.type = TRACE_EVENT_TERMINATE;
1402                return event;
1403        }
1404
1405        do {
1406                erfptr = NULL;
1407                numbytes = 0;
1408
1409                /* Need to call dag_available so that the top pointer will get
1410                 * updated, otherwise we'll never see any data! */
1411                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1412
1413                /* May as well not bother calling dag_get_record if
1414                 * dag_available suggests that there's no data */
1415                if (numbytes != 0)
1416                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1417                if (erfptr == NULL) {
1418                        /* No packet available - sleep for a very short time */
1419                        if (libtrace_halt) {
1420                                event.type = TRACE_EVENT_TERMINATE;
1421                        } else {
1422                                event.type = TRACE_EVENT_SLEEP;
1423                                event.seconds = 0.0001;
1424                        }
1425                        break;
1426                }
1427                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1428                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1429                        event.type = TRACE_EVENT_TERMINATE;
1430                        break;
1431                }
1432
1433
1434                event.size = trace_get_capture_length(packet) +
1435                        trace_get_framing_length(packet);
1436
1437                /* XXX trace_read_packet() normally applies the following
1438                 * config options for us, but this function is called via
1439                 * trace_event() so we have to do it ourselves */
1440
1441                if (libtrace->filter) {
1442                        int filtret = trace_apply_filter(libtrace->filter,
1443                                                         packet);
1444                        if (filtret == -1) {
1445                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1446                                              "Bad BPF Filter");
1447                                event.type = TRACE_EVENT_TERMINATE;
1448                                break;
1449                        }
1450
1451                        if (filtret == 0) {
1452                                /* This packet isn't useful so we want to
1453                                 * immediately see if there is another suitable
1454                                 * one - we definitely DO NOT want to return
1455                                 * a sleep event in this case, like we used to
1456                                 * do! */
1457                                libtrace->filtered_packets ++;
1458                                trace_clear_cache(packet);
1459                                continue;
1460                        }
1461
1462                        event.type = TRACE_EVENT_PACKET;
1463                } else {
1464                        event.type = TRACE_EVENT_PACKET;
1465                }
1466
1467                /* Update the DUCK timer */
1468                tv = trace_get_timeval(packet);
1469                DUCK.last_pkt = tv.tv_sec;
1470               
1471                if (libtrace->snaplen > 0) {
1472                        trace_set_capture_length(packet, libtrace->snaplen);
1473                }
1474                libtrace->accepted_packets ++;
1475                break;
1476        } while(1);
1477
1478        return event;
1479}
1480
1481static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1482{
1483        libtrace_list_node_t *tmp;
1484        assert(stat && libtrace);
1485        tmp = FORMAT_DATA_HEAD;
1486
1487        /* Dropped packets */
1488        stat->dropped_valid = 1;
1489        stat->dropped = 0;
1490        while (tmp != NULL) {
1491                stat->dropped += STREAM_DATA(tmp)->drops;
1492                tmp = tmp->next;
1493        }
1494
1495}
1496
1497static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1498                                       libtrace_stat_t *stat) {
1499        struct dag_per_stream_t *stream_data = t->format_data;
1500        assert(stat && libtrace);
1501
1502        stat->dropped_valid = 1;
1503        stat->dropped = stream_data->drops;
1504
1505        stat->filtered_valid = 1;
1506        stat->filtered = 0;
1507}
1508
1509/* Prints some semi-useful help text about the DAG format module */
1510static void dag_help(void) {
1511        printf("dag format module: $Revision: 1755 $\n");
1512        printf("Supported input URIs:\n");
1513        printf("\tdag:/dev/dagn\n");
1514        printf("\n");
1515        printf("\te.g.: dag:/dev/dag0\n");
1516        printf("\n");
1517        printf("Supported output URIs:\n");
1518        printf("\tnone\n");
1519        printf("\n");
1520}
1521
1522static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1523                                bool reader)
1524{
1525        struct dag_per_stream_t *stream_data;
1526        libtrace_list_node_t *node;
1527
1528        if (reader && t->type == THREAD_PERPKT) {
1529                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1530                                                t->perpkt_num);
1531                if (node == NULL) {
1532                        return -1;
1533                }
1534                stream_data = node->data;
1535
1536                /* Pass the per thread data to the thread */
1537                t->format_data = stream_data;
1538
1539                /* Attach and start the DAG stream */
1540                if (dag_start_input_stream(libtrace, stream_data) < 0)
1541                        return -1;
1542        }
1543
1544        return 0;
1545}
1546
1547static struct libtrace_format_t dag = {
1548        "dag",
1549        "$Id$",
1550        TRACE_FORMAT_ERF,
1551        dag_probe_filename,             /* probe filename */
1552        NULL,                           /* probe magic */
1553        dag_init_input,                 /* init_input */
1554        dag_config_input,               /* config_input */
1555        dag_start_input,                /* start_input */
1556        dag_pause_input,                /* pause_input */
1557        dag_init_output,                /* init_output */
1558        NULL,                           /* config_output */
1559        dag_start_output,               /* start_output */
1560        dag_fin_input,                  /* fin_input */
1561        dag_fin_output,                 /* fin_output */
1562        dag_read_packet,                /* read_packet */
1563        dag_prepare_packet,             /* prepare_packet */
1564        NULL,                           /* fin_packet */
1565        dag_write_packet,               /* write_packet */
1566        NULL,                           /* flush_output */
1567        erf_get_link_type,              /* get_link_type */
1568        erf_get_direction,              /* get_direction */
1569        erf_set_direction,              /* set_direction */
1570        erf_get_erf_timestamp,          /* get_erf_timestamp */
1571        NULL,                           /* get_timeval */
1572        NULL,                           /* get_seconds */
1573        NULL,                           /* get_timespec */
1574        NULL,                           /* seek_erf */
1575        NULL,                           /* seek_timeval */
1576        NULL,                           /* seek_seconds */
1577        erf_get_capture_length,         /* get_capture_length */
1578        erf_get_wire_length,            /* get_wire_length */
1579        erf_get_framing_length,         /* get_framing_length */
1580        erf_set_capture_length,         /* set_capture_length */
1581        NULL,                           /* get_received_packets */
1582        NULL,                           /* get_filtered_packets */
1583        NULL,                           /* get_dropped_packets */
1584        dag_get_statistics,             /* get_statistics */
1585        NULL,                           /* get_fd */
1586        trace_event_dag,                /* trace_event */
1587        dag_help,                       /* help */
1588        NULL,                            /* next pointer */
1589        {true, 0}, /* live packet capture, thread limit TBD */
1590        dag_pstart_input,
1591        dag_pread_packets,
1592        dag_pause_input,
1593        NULL,
1594        dag_pregister_thread,
1595        NULL,
1596        dag_get_thread_statistics       /* get thread stats */
1597};
1598
1599void dag_constructor(void)
1600{
1601        register_format(&dag);
1602}
Note: See TracBrowser for help on using the repository browser.