source: lib/format_dag25.c @ cc9c9de

cachetimestampsdevelopetsiliverc-4.0.4ringdecrementfixringperformance
Last change on this file since cc9c9de was cc9c9de, checked in by Shane Alcock <salcock@…>, 3 years ago

Add new config option for trace_event() -- REPLAY_SPEEDUP

This allows users to specify a "speedup factor" when using
trace_event() to replay trace files, i.e. all inter-packet
gaps will be divided by the speedup factor. This allows traces
to be replayed faster, while still preserving the same relative
gaps between packets.

  • Property mode set to 100644
File size: 45.3 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#define _GNU_SOURCE
27
28#include "config.h"
29#include "common.h"
30#include "libtrace.h"
31#include "libtrace_int.h"
32#include "format_helper.h"
33#include "format_erf.h"
34
35#include <assert.h>
36#include <errno.h>
37#include <fcntl.h>
38#include <stdio.h>
39#include <string.h>
40#include <stdlib.h>
41#include <sys/stat.h>
42
43#include <sys/mman.h>
44/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
45 * Windows anyway so we'll worry about this more later :] */
46#include <pthread.h>
47
48
49#ifdef HAVE_DAG_CONFIG_API_H
50#include <dag_config_api.h>
51#endif
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66/* This format deals with DAG cards that are using drivers from the 2.5 version
67 * onwards, including 3.X.
68 *
69 * DAG is a LIVE capture format.
70 *
71 * This format does support writing, provided the DAG card that you are using
72 * has transmit (Tx) support. Additionally, packets read using this format
73 * are in the ERF format, so can easily be written as ERF traces without
74 * losing any data.
75 */
76
77#define DATA(x) ((struct dag_format_data_t *)x->format_data)
78#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
79#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
80
81#define FORMAT_DATA DATA(libtrace)
82#define FORMAT_DATA_OUT DATA_OUT(libtrace)
83
84#define DUCK FORMAT_DATA->duck
85
86#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
87#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
88
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* The DAG device being used for writing */
107        struct dag_dev_t *device;
108        /* The DAG stream that is being written on */
109        unsigned int dagstream;
110        /* Boolean flag indicating whether the stream is currently attached */
111        int stream_attached;
112        /* The amount of data waiting to be transmitted, in bytes */
113        uint64_t waiting;
114        /* A buffer to hold the data to be transmittted */
115        uint8_t *txbuffer;
116};
117
118/* Data that is stored against each input stream */
119struct dag_per_stream_t {
120        /* DAG stream number */
121        uint16_t dagstream;
122        /* Pointer to the last unread byte in the DAG memory */
123        uint8_t *top;
124        /* Pointer to the first unread byte in the DAG memory */
125        uint8_t *bottom;
126        /* Amount of data processed from the bottom pointer */
127        uint32_t processed;
128        /* Number of packets seen by the stream */
129        uint64_t pkt_count;
130        /* Drop count for this particular stream */
131        uint64_t drops;
132        /* Boolean values to indicate if a particular interface has been seen
133         * or not. This is limited to four interfaces, which is enough to
134         * support all current DAG cards */
135        uint8_t seeninterface[4];
136};
137
138/* "Global" data that is stored for each DAG input trace */
139struct dag_format_data_t {
140        /* DAG device */
141        struct dag_dev_t *device;
142        /* Boolean flag indicating whether the trace is currently attached */
143        int stream_attached;
144        /* Data stored against each DAG input stream */
145        libtrace_list_t *per_stream;
146
147        /* Data required for regular DUCK reporting.
148         * We put this on a new cache line otherwise we have a lot of false
149         * sharing caused by updating the last_pkt.
150         * This should only ever be accessed by the first thread stream,
151         * that includes both read and write operations.
152         */
153        struct {
154                /* Timestamp of the last DUCK report */
155                uint32_t last_duck;
156                /* The number of seconds between each DUCK report */
157                uint32_t duck_freq;
158                /* Timestamp of the last packet read from the DAG card */
159                uint32_t last_pkt;
160                /* Dummy trace to ensure DUCK packets are dealt with using the
161                 * DUCK format functions */
162                libtrace_t *dummy_duck;
163        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
164};
165
166/* To be thread-safe, we're going to need a mutex for operating on the list
167 * of DAG devices */
168pthread_mutex_t open_dag_mutex;
169
170/* The list of DAG devices that have been opened by libtrace.
171 *
172 * We can only open each DAG device once, but we might want to read from
173 * multiple streams. Therefore, we need to maintain a list of devices that we
174 * have opened (with ref counts!) so that we don't try to open a device too
175 * many times or close a device that we're still using */
176struct dag_dev_t *open_dags = NULL;
177
178/* Returns the amount of padding between the ERF header and the start of the
179 * captured packet data */
180static int dag_get_padding(const libtrace_packet_t *packet)
181{
182        /* ERF Ethernet records have a 2 byte padding before the packet itself
183         * so that the IP header is aligned on a 32 bit boundary.
184         */
185        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
186                dag_record_t *erfptr = (dag_record_t *)packet->header;
187                switch(erfptr->type) {
188                        case TYPE_ETH:
189                        case TYPE_COLOR_ETH:
190                        case TYPE_DSM_COLOR_ETH:
191                        case TYPE_COLOR_HASH_ETH:
192                                return 2;
193                        default:                return 0;
194                }
195        }
196        else {
197                switch(trace_get_link_type(packet)) {
198                        case TRACE_TYPE_ETH:    return 2;
199                        default:                return 0;
200                }
201        }
202}
203
204/* Attempts to determine if the given filename refers to a DAG device */
205static int dag_probe_filename(const char *filename)
206{
207        struct stat statbuf;
208        /* Can we stat the file? */
209        if (stat(filename, &statbuf) != 0) {
210                return 0;
211        }
212        /* Is it a character device? */
213        if (!S_ISCHR(statbuf.st_mode)) {
214                return 0;
215        }
216        /* Yeah, it's probably us. */
217        return 1;
218}
219
220/* Initialises the DAG output data structure */
221static void dag_init_format_out_data(libtrace_out_t *libtrace)
222{
223        libtrace->format_data = (struct dag_format_data_out_t *)
224                malloc(sizeof(struct dag_format_data_out_t));
225        // no DUCK on output
226        FORMAT_DATA_OUT->stream_attached = 0;
227        FORMAT_DATA_OUT->device = NULL;
228        FORMAT_DATA_OUT->dagstream = 0;
229        FORMAT_DATA_OUT->waiting = 0;
230
231}
232
233/* Initialises the DAG input data structure */
234static void dag_init_format_data(libtrace_t *libtrace)
235{
236        struct dag_per_stream_t stream_data;
237
238        libtrace->format_data = (struct dag_format_data_t *)
239                malloc(sizeof(struct dag_format_data_t));
240        DUCK.last_duck = 0;
241        DUCK.duck_freq = 0;
242        DUCK.last_pkt = 0;
243        DUCK.dummy_duck = NULL;
244
245        FORMAT_DATA->per_stream =
246                libtrace_list_init(sizeof(stream_data));
247        assert(FORMAT_DATA->per_stream != NULL);
248
249        /* We'll start with just one instance of stream_data, and we'll
250         * add more later if we need them */
251        memset(&stream_data, 0, sizeof(stream_data));
252        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
253}
254
255/* Determines if there is already an entry for the given DAG device in the
256 * device list and increments the reference count for that device, if found.
257 *
258 * NOTE: This function assumes the open_dag_mutex is held by the caller */
259static struct dag_dev_t *dag_find_open_device(char *dev_name)
260{
261        struct dag_dev_t *dag_dev;
262
263        dag_dev = open_dags;
264
265        /* XXX: Not exactly zippy, but how often are we going to be dealing
266         * with multiple dag cards? */
267        while (dag_dev != NULL) {
268                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
269                        dag_dev->ref_count ++;
270                        return dag_dev;
271                }
272                dag_dev = dag_dev->next;
273        }
274        return NULL;
275}
276
277/* Closes a DAG device and removes it from the device list.
278 *
279 * Attempting to close a DAG device that has a non-zero reference count will
280 * cause an assertion failure!
281 *
282 * NOTE: This function assumes the open_dag_mutex is held by the caller */
283static void dag_close_device(struct dag_dev_t *dev)
284{
285        /* Need to remove from the device list */
286        assert(dev->ref_count == 0);
287
288        if (dev->prev == NULL) {
289                open_dags = dev->next;
290                if (dev->next)
291                        dev->next->prev = NULL;
292        } else {
293                dev->prev->next = dev->next;
294                if (dev->next)
295                        dev->next->prev = dev->prev;
296        }
297
298        dag_close(dev->fd);
299        free(dev);
300}
301
302
303/* Opens a new DAG device for writing and adds it to the DAG device list
304 *
305 * NOTE: this function should only be called when opening a DAG device for
306 * writing - there is little practical difference between this and the
307 * function below that covers the reading case, but we need the output trace
308 * object to report errors properly so the two functions take slightly
309 * different arguments. This is really lame and there should be a much better
310 * way of doing this.
311 *
312 * NOTE: This function assumes the open_dag_mutex is held by the caller
313 */
314static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
315                                                char *dev_name)
316{
317        struct stat buf;
318        int fd;
319        struct dag_dev_t *new_dev;
320
321        /* Make sure the device exists */
322        if (stat(dev_name, &buf) == -1) {
323                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
324                return NULL;
325        }
326
327        /* Make sure it is the appropriate type of device */
328        if (S_ISCHR(buf.st_mode)) {
329                /* Try opening the DAG device */
330                if((fd = dag_open(dev_name)) < 0) {
331                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
332                                        dev_name);
333                        return NULL;
334                }
335        } else {
336                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
337                                dev_name);
338                return NULL;
339        }
340
341        /* Add the device to our device list - it is just a doubly linked
342         * list with no inherent ordering; just tack the new one on the front
343         */
344        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
345        new_dev->fd = fd;
346        new_dev->dev_name = dev_name;
347        new_dev->ref_count = 1;
348
349        new_dev->prev = NULL;
350        new_dev->next = open_dags;
351        if (open_dags)
352                open_dags->prev = new_dev;
353
354        open_dags = new_dev;
355
356        return new_dev;
357}
358
359/* Opens a new DAG device for reading and adds it to the DAG device list
360 *
361 * NOTE: this function should only be called when opening a DAG device for
362 * reading - there is little practical difference between this and the
363 * function above that covers the writing case, but we need the input trace
364 * object to report errors properly so the two functions take slightly
365 * different arguments. This is really lame and there should be a much better
366 * way of doing this.
367 *
368 * NOTE: This function assumes the open_dag_mutex is held by the caller */
369static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
370        struct stat buf;
371        int fd;
372        struct dag_dev_t *new_dev;
373
374        /* Make sure the device exists */
375        if (stat(dev_name, &buf) == -1) {
376                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
377                return NULL;
378        }
379
380        /* Make sure it is the appropriate type of device */
381        if (S_ISCHR(buf.st_mode)) {
382                /* Try opening the DAG device */
383                if((fd = dag_open(dev_name)) < 0) {
384                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
385                                      dev_name);
386                        return NULL;
387                }
388        } else {
389                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
390                              dev_name);
391                return NULL;
392        }
393
394        /* Add the device to our device list - it is just a doubly linked
395         * list with no inherent ordering; just tack the new one on the front
396         */
397        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
398        new_dev->fd = fd;
399        new_dev->dev_name = dev_name;
400        new_dev->ref_count = 1;
401
402        new_dev->prev = NULL;
403        new_dev->next = open_dags;
404        if (open_dags)
405                open_dags->prev = new_dev;
406
407        open_dags = new_dev;
408
409        return new_dev;
410}
411
412/* Creates and initialises a DAG output trace */
413static int dag_init_output(libtrace_out_t *libtrace)
414{
415        /* Upon successful creation, the device name is stored against the
416         * device and free when it is free()d */
417        char *dag_dev_name = NULL;
418        char *scan = NULL;
419        struct dag_dev_t *dag_device = NULL;
420        int stream = 1;
421
422        /* XXX I don't know if this is important or not, but this function
423         * isn't present in all of the driver releases that this code is
424         * supposed to support! */
425        /*
426        unsigned long wake_time;
427        dagutil_sleep_get_wake_time(&wake_time,0);
428        */
429
430        dag_init_format_out_data(libtrace);
431        /* Grab the mutex while we're likely to be messing with the device
432         * list */
433        pthread_mutex_lock(&open_dag_mutex);
434
435        /* Specific streams are signified using a comma in the libtrace URI,
436         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
437         *
438         * If no stream is specified, we will write using stream 1 */
439        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
440                dag_dev_name = strdup(libtrace->uridata);
441        } else {
442                dag_dev_name = (char *)strndup(libtrace->uridata,
443                                (size_t)(scan - libtrace->uridata));
444                stream = atoi(++scan);
445        }
446        FORMAT_DATA_OUT->dagstream = stream;
447
448        /* See if our DAG device is already open */
449        dag_device = dag_find_open_device(dag_dev_name);
450
451        if (dag_device == NULL) {
452                /* Device not yet opened - open it ourselves */
453                dag_device = dag_open_output_device(libtrace, dag_dev_name);
454        } else {
455                /* Otherwise, just use the existing one */
456                free(dag_dev_name);
457                dag_dev_name = NULL;
458        }
459
460        /* Make sure we have successfully opened a DAG device */
461        if (dag_device == NULL) {
462                if (dag_dev_name) {
463                        free(dag_dev_name);
464                }
465                pthread_mutex_unlock(&open_dag_mutex);
466                return -1;
467        }
468
469        FORMAT_DATA_OUT->device = dag_device;
470        pthread_mutex_unlock(&open_dag_mutex);
471        return 0;
472}
473
474/* Creates and initialises a DAG input trace */
475static int dag_init_input(libtrace_t *libtrace) {
476        /* Upon successful creation, the device name is stored against the
477         * device and free when it is free()d */
478        char *dag_dev_name = NULL;
479        char *scan = NULL;
480        int stream = 0;
481        struct dag_dev_t *dag_device = NULL;
482
483        dag_init_format_data(libtrace);
484        /* Grab the mutex while we're likely to be messing with the device
485         * list */
486        pthread_mutex_lock(&open_dag_mutex);
487
488
489        /* DAG cards support multiple streams. In a single threaded capture,
490         * these are specified using a comma in the libtrace URI,
491         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
492         *
493         * If no stream is specified, we will read from stream 0 with
494         * one thread
495         */
496        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
497                dag_dev_name = strdup(libtrace->uridata);
498        } else {
499                dag_dev_name = (char *)strndup(libtrace->uridata,
500                                (size_t)(scan - libtrace->uridata));
501                stream = atoi(++scan);
502        }
503
504        FORMAT_DATA_FIRST->dagstream = stream;
505
506        /* See if our DAG device is already open */
507        dag_device = dag_find_open_device(dag_dev_name);
508
509        if (dag_device == NULL) {
510                /* Device not yet opened - open it ourselves */
511                dag_device = dag_open_device(libtrace, dag_dev_name);
512        } else {
513                /* Otherwise, just use the existing one */
514                free(dag_dev_name);
515                dag_dev_name = NULL;
516        }
517
518        /* Make sure we have successfully opened a DAG device */
519        if (dag_device == NULL) {
520                if (dag_dev_name)
521                        free(dag_dev_name);
522                dag_dev_name = NULL;
523                pthread_mutex_unlock(&open_dag_mutex);
524                return -1;
525        }
526
527        FORMAT_DATA->device = dag_device;
528
529        /* See Config_Status_API_Programming_Guide.pdf from the Endace
530           Dag Documentation */
531        /* Check kBooleanAttributeActive is true -- no point capturing
532         * on an interface that's disabled
533         *
534         * The symptom of the port being disabled is that libtrace
535         * will appear to hang. */
536        /* Check kBooleanAttributeFault is false */
537        /* Check kBooleanAttributeLocalFault is false */
538        /* Check kBooleanAttributeLock is true ? */
539        /* Check kBooleanAttributePeerLink ? */
540
541        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
542           on libtrace promisc attribute?*/
543        /* Set kUint32AttributeSnapLength to the snaplength */
544
545        pthread_mutex_unlock(&open_dag_mutex);
546        return 0;
547}
548
549#ifdef HAVE_DAG_CONFIG_API_H
550static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
551        dag_card_ref_t card_ref = NULL;
552        dag_component_t root = NULL;
553        attr_uuid_t uuid = 0;
554
555        if (slen < 0)
556                slen = 0; 
557
558        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
559        root = dag_config_get_root_component(card_ref);
560       
561        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
562        dag_config_set_boolean_attribute(card_ref, uuid, true);
563
564        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
565        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
566
567        return 0;
568       
569
570} 
571#endif /* HAVE_DAG_CONFIG_API_H */
572
573/* Configures a DAG input trace */
574static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
575                            void *data)
576{
577        switch(option) {
578        case TRACE_OPTION_META_FREQ:
579                /* This option is used to specify the frequency of DUCK
580                 * updates */
581                DUCK.duck_freq = *(int *)data;
582                return 0;
583        case TRACE_OPTION_SNAPLEN:
584#ifdef HAVE_DAG_CONFIG_API_H
585                return dag_csapi_set_snaplen(libtrace, *(int *)data);
586#else
587                /* Tell the card our new snap length */
588        {
589                char conf_str[4096];
590                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
591                if (dag_configure(FORMAT_DATA->device->fd,
592                                  conf_str) != 0) {
593                        trace_set_err(libtrace, errno, "Failed to configure "
594                                      "snaplen on DAG card: %s",
595                                      libtrace->uridata);
596                        return -1;
597                }
598        }
599#endif /* HAVE_DAG_CONFIG_API_H */
600
601                return 0;
602        case TRACE_OPTION_PROMISC:
603                /* DAG already operates in a promisc fashion */
604                return -1;
605        case TRACE_OPTION_FILTER:
606                /* We don't yet support pushing filters into DAG
607                 * cards */
608                return -1;
609        case TRACE_OPTION_EVENT_REALTIME:
610        case TRACE_OPTION_REPLAY_SPEEDUP:
611                /* Live capture is always going to be realtime */
612                return -1;
613        case TRACE_OPTION_HASHER:
614                /* Lets just say we did this, it's currently still up to
615                 * the user to configure this correctly. */
616                return 0;
617        }
618        return -1;
619}
620
621/* Starts a DAG output trace */
622static int dag_start_output(libtrace_out_t *libtrace)
623{
624        struct timeval zero, nopoll;
625
626        zero.tv_sec = 0;
627        zero.tv_usec = 0;
628        nopoll = zero;
629
630        /* Attach and start the DAG stream */
631        if (dag_attach_stream64(FORMAT_DATA_OUT->device->fd,
632                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
633                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
634                return -1;
635        }
636
637        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
638                        FORMAT_DATA_OUT->dagstream) < 0) {
639                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
640                return -1;
641        }
642        FORMAT_DATA_OUT->stream_attached = 1;
643
644        /* We don't want the dag card to do any sleeping */
645        dag_set_stream_poll64(FORMAT_DATA_OUT->device->fd,
646                        FORMAT_DATA_OUT->dagstream, 0, &zero,
647                        &nopoll);
648
649        return 0;
650}
651
652static int dag_start_input_stream(libtrace_t *libtrace,
653                                  struct dag_per_stream_t * stream) {
654        struct timeval zero, nopoll;
655        uint8_t *top, *bottom, *starttop;
656        top = bottom = NULL;
657
658        zero.tv_sec = 0;
659        zero.tv_usec = 10000;
660        nopoll = zero;
661
662        /* Attach and start the DAG stream */
663        if (dag_attach_stream64(FORMAT_DATA->device->fd,
664                              stream->dagstream, 0, 0) < 0) {
665                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
666                              stream->dagstream);
667                return -1;
668        }
669
670        if (dag_start_stream(FORMAT_DATA->device->fd,
671                             stream->dagstream) < 0) {
672                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
673                              stream->dagstream);
674                return -1;
675        }
676        FORMAT_DATA->stream_attached = 1;
677
678        /* We don't want the dag card to do any sleeping */
679        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
680                            stream->dagstream, 0, &zero,
681                            &nopoll) < 0) {
682                trace_set_err(libtrace, errno,
683                              "dag_set_stream_poll failed!");
684                return -1;
685        }
686
687        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
688                                      stream->dagstream,
689                                      &bottom);
690
691        /* Should probably flush the memory hole now */
692        top = starttop;
693        while (starttop - bottom > 0) {
694                bottom += (starttop - bottom);
695                top = dag_advance_stream(FORMAT_DATA->device->fd,
696                                         stream->dagstream,
697                                         &bottom);
698        }
699        stream->top = top;
700        stream->bottom = bottom;
701        stream->processed = 0;
702        stream->drops = 0;
703
704        return 0;
705
706}
707
708/* Starts a DAG input trace */
709static int dag_start_input(libtrace_t *libtrace)
710{
711        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
712}
713
714static int dag_pstart_input(libtrace_t *libtrace)
715{
716        char *scan, *tok;
717        uint16_t stream_count = 0, max_streams;
718        int iserror = 0;
719        struct dag_per_stream_t stream_data;
720
721        /* Check we aren't trying to create more threads than the DAG card can
722         * handle */
723        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
724        if (libtrace->perpkt_thread_count > max_streams) {
725                fprintf(stderr,
726                              "WARNING: DAG has only %u streams available, "
727                              "capping total number of threads at this value.",
728                              max_streams);
729                libtrace->perpkt_thread_count = max_streams;
730        }
731
732        /* Get the stream names from the uri */
733        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
734                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
735                              "Format uri doesn't specify the DAG streams");
736                iserror = 1;
737                goto cleanup;
738        }
739
740        scan++;
741
742        tok = strtok(scan, ",");
743        while (tok != NULL) {
744                /* Ensure we haven't specified too many streams */
745                if (stream_count >= libtrace->perpkt_thread_count) {
746                        fprintf(stderr,
747                                      "WARNING: Format uri specifies too many "
748                                      "streams. Maximum is %u, so only using "
749                                      "the first %u from the uri.",
750                                      libtrace->perpkt_thread_count, 
751                                      libtrace->perpkt_thread_count);
752                        break;
753                }
754
755                /* Save the stream details */
756                if (stream_count == 0) {
757                        /* Special case where we update the existing stream
758                         * data structure */
759                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
760                } else {
761                        memset(&stream_data, 0, sizeof(stream_data));
762                        stream_data.dagstream = (uint16_t)atoi(tok);
763                        libtrace_list_push_back(FORMAT_DATA->per_stream,
764                                                &stream_data);
765                }
766
767                stream_count++;
768                tok = strtok(NULL, ",");
769        }
770
771        if (stream_count < libtrace->perpkt_thread_count) {
772                libtrace->perpkt_thread_count = stream_count;
773        }
774       
775        FORMAT_DATA->stream_attached = 1;
776
777 cleanup:
778        if (iserror) {
779                return -1;
780        } else {
781                return 0;
782        }
783}
784
785/* Pauses a DAG output trace */
786static int dag_pause_output(libtrace_out_t *libtrace)
787{
788        /* Stop and detach the stream */
789        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
790                            FORMAT_DATA_OUT->dagstream) < 0) {
791                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
792                return -1;
793        }
794        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
795                              FORMAT_DATA_OUT->dagstream) < 0) {
796                trace_set_err_out(libtrace, errno,
797                                  "Could not detach DAG stream");
798                return -1;
799        }
800        FORMAT_DATA_OUT->stream_attached = 0;
801        return 0;
802}
803
804/* Pauses a DAG input trace */
805static int dag_pause_input(libtrace_t *libtrace)
806{
807        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
808
809        /* Stop and detach each stream */
810        while (tmp != NULL) {
811                if (dag_stop_stream(FORMAT_DATA->device->fd,
812                                    STREAM_DATA(tmp)->dagstream) < 0) {
813                        trace_set_err(libtrace, errno,
814                                      "Could not stop DAG stream");
815                        return -1;
816                }
817                if (dag_detach_stream(FORMAT_DATA->device->fd,
818                                      STREAM_DATA(tmp)->dagstream) < 0) {
819                        trace_set_err(libtrace, errno,
820                                      "Could not detach DAG stream");
821                        return -1;
822                }
823
824                tmp = tmp->next;
825        }
826
827        FORMAT_DATA->stream_attached = 0;
828        return 0;
829}
830
831
832
833/* Closes a DAG input trace */
834static int dag_fin_input(libtrace_t *libtrace)
835{
836        /* Need the lock, since we're going to be handling the device list */
837        pthread_mutex_lock(&open_dag_mutex);
838
839        /* Detach the stream if we are not paused */
840        if (FORMAT_DATA->stream_attached)
841                dag_pause_input(libtrace);
842        FORMAT_DATA->device->ref_count--;
843
844        /* Close the DAG device if there are no more references to it */
845        if (FORMAT_DATA->device->ref_count == 0)
846                dag_close_device(FORMAT_DATA->device);
847
848        if (DUCK.dummy_duck)
849                trace_destroy_dead(DUCK.dummy_duck);
850
851        /* Clear the list */
852        libtrace_list_deinit(FORMAT_DATA->per_stream);
853        free(libtrace->format_data);
854        pthread_mutex_unlock(&open_dag_mutex);
855        return 0; /* success */
856}
857
858/* Closes a DAG output trace */
859static int dag_fin_output(libtrace_out_t *libtrace)
860{
861
862        /* Commit any outstanding traffic in the txbuffer */
863        if (FORMAT_DATA_OUT->waiting) {
864                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
865                                           FORMAT_DATA_OUT->dagstream,
866                                           FORMAT_DATA_OUT->waiting );
867        }
868
869        /* Wait until the buffer is nearly clear before exiting the program,
870         * as we will lose packets otherwise */
871        dag_tx_get_stream_space64
872                (FORMAT_DATA_OUT->device->fd,
873                 FORMAT_DATA_OUT->dagstream,
874                 dag_get_stream_buffer_size64(FORMAT_DATA_OUT->device->fd,
875                                            FORMAT_DATA_OUT->dagstream) - 8);
876
877        /* Need the lock, since we're going to be handling the device list */
878        pthread_mutex_lock(&open_dag_mutex);
879
880        /* Detach the stream if we are not paused */
881        if (FORMAT_DATA_OUT->stream_attached)
882                dag_pause_output(libtrace);
883        FORMAT_DATA_OUT->device->ref_count --;
884
885        /* Close the DAG device if there are no more references to it */
886        if (FORMAT_DATA_OUT->device->ref_count == 0)
887                dag_close_device(FORMAT_DATA_OUT->device);
888        free(libtrace->format_data);
889        pthread_mutex_unlock(&open_dag_mutex);
890        return 0; /* success */
891}
892
893#ifdef DAGIOC_CARD_DUCK
894#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
895#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
896#else
897#ifdef DAGIOCDUCK
898#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
899#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
900#else
901#warning "DAG appears to be missing DUCK support"
902#endif
903#endif
904
905/* Extracts DUCK information from the DAG card and produces a DUCK packet */
906static int dag_get_duckinfo(libtrace_t *libtrace,
907                                libtrace_packet_t *packet) {
908
909        if (DUCK.duck_freq == 0)
910                return 0;
911
912#ifndef LIBTRACE_DUCK_IOCTL
913        trace_set_err(libtrace, errno, 
914                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
915        DUCK.duck_freq = 0;
916        return -1;
917#endif
918
919        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
920                return 0;
921
922        /* Allocate memory for the DUCK data */
923        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
924            !packet->buffer) {
925                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
926                packet->buf_control = TRACE_CTRL_PACKET;
927                if (!packet->buffer) {
928                        trace_set_err(libtrace, errno,
929                                      "Cannot allocate packet buffer");
930                        return -1;
931                }
932        }
933
934        /* DUCK doesn't have a format header */
935        packet->header = 0;
936        packet->payload = packet->buffer;
937
938        /* No need to check if we can get DUCK or not - we're modern
939         * enough so just grab the DUCK info */
940        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
941                   (duckinf_t *)packet->payload) < 0)) {
942                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
943                DUCK.duck_freq = 0;
944                return -1;
945        }
946
947        packet->type = LIBTRACE_DUCK_VERSION;
948
949        /* Set the packet's trace to point at a DUCK trace, so that the
950         * DUCK format functions will be called on the packet rather than the
951         * DAG ones */
952        if (!DUCK.dummy_duck)
953                DUCK.dummy_duck = trace_create_dead("duck:dummy");
954        packet->trace = DUCK.dummy_duck;
955        DUCK.last_duck = DUCK.last_pkt;
956        packet->error = sizeof(duckinf_t);
957        return sizeof(duckinf_t);
958}
959
960/* Determines the amount of data available to read from the DAG card */
961static int dag_available(libtrace_t *libtrace,
962                         struct dag_per_stream_t *stream_data)
963{
964        uint32_t diff = stream_data->top - stream_data->bottom;
965
966        /* If we've processed more than 4MB of data since we last called
967         * dag_advance_stream, then we should call it again to allow the
968         * space occupied by that 4MB to be released */
969        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
970                return diff;
971
972        /* Update the top and bottom pointers */
973        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
974                                              stream_data->dagstream,
975                                              &(stream_data->bottom));
976
977        if (stream_data->top == NULL) {
978                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
979                return -1;
980        }
981        stream_data->processed = 0;
982        diff = stream_data->top - stream_data->bottom;
983        return diff;
984}
985
986/* Returns a pointer to the start of the next complete ERF record */
987static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
988{
989        dag_record_t *erfptr = NULL;
990        uint16_t size;
991
992        erfptr = (dag_record_t *)stream_data->bottom;
993        if (!erfptr)
994                return NULL;
995
996        size = ntohs(erfptr->rlen);
997        assert( size >= dag_record_size );
998
999        /* Make certain we have the full packet available */
1000        if (size > (stream_data->top - stream_data->bottom))
1001                return NULL;
1002
1003        stream_data->bottom += size;
1004        stream_data->processed += size;
1005        return erfptr;
1006}
1007
1008/* Converts a buffer containing a recently read DAG packet record into a
1009 * libtrace packet */
1010static int dag_prepare_packet_stream(libtrace_t *libtrace,
1011                                     struct dag_per_stream_t *stream_data,
1012                                     libtrace_packet_t *packet,
1013                                     void *buffer, libtrace_rt_types_t rt_type,
1014                                     uint32_t flags)
1015{
1016        dag_record_t *erfptr;
1017
1018        /* If the packet previously owned a buffer that is not the buffer
1019         * that contains the new packet data, we're going to need to free the
1020         * old one to avoid memory leaks */
1021        if (packet->buffer != buffer &&
1022            packet->buf_control == TRACE_CTRL_PACKET) {
1023                free(packet->buffer);
1024        }
1025
1026        /* Set the buffer owner appropriately */
1027        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1028                packet->buf_control = TRACE_CTRL_PACKET;
1029        } else
1030                packet->buf_control = TRACE_CTRL_EXTERNAL;
1031
1032        /* Update the packet pointers and type appropriately */
1033        erfptr = (dag_record_t *)buffer;
1034        packet->buffer = erfptr;
1035        packet->header = erfptr;
1036        packet->type = rt_type;
1037
1038        if (erfptr->flags.rxerror == 1) {
1039                /* rxerror means the payload is corrupt - drop the payload
1040                 * by tweaking rlen */
1041                packet->payload = NULL;
1042                erfptr->rlen = htons(erf_get_framing_length(packet));
1043        } else {
1044                packet->payload = (char*)packet->buffer
1045                        + erf_get_framing_length(packet);
1046        }
1047
1048        if (libtrace->format_data == NULL) {
1049                dag_init_format_data(libtrace);
1050        }
1051
1052        /* Update the dropped packets counter */
1053        /* No loss counter for DSM coloured records - have to use some
1054         * other API */
1055        if (erf_is_color_type(erfptr->type)) {
1056                /* TODO */
1057        } else {
1058                /* Use the ERF loss counter */
1059                if (stream_data->seeninterface[erfptr->flags.iface]
1060                    == 0) {
1061                        stream_data->seeninterface[erfptr->flags.iface]
1062                                = 1;
1063                } else {
1064                        stream_data->drops += ntohs(erfptr->lctr);
1065                }
1066        }
1067
1068        return 0;
1069}
1070
1071static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1072                              void *buffer, libtrace_rt_types_t rt_type,
1073                              uint32_t flags)
1074{
1075        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1076                                       buffer, rt_type, flags);
1077}
1078
1079/*
1080 * dag_write_packet() at this stage attempts to improve tx performance
1081 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1082 * has been met. I observed approximately 270% performance increase
1083 * through this relatively naive tweak. No optimisation of buffer sizes
1084 * was attempted.
1085 */
1086
1087/* Pushes an ERF record onto the transmit stream */
1088static int dag_dump_packet(libtrace_out_t *libtrace,
1089                           dag_record_t *erfptr, unsigned int pad,
1090                           void *buffer)
1091{
1092        int size;
1093
1094        /*
1095         * If we've got 0 bytes waiting in the txqueue, assume that we
1096         * haven't requested any space yet, and request some, storing
1097         * the pointer at FORMAT_DATA_OUT->txbuffer.
1098         *
1099         * The amount to request is slightly magical at the moment - it's
1100         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1101         * the buffer and handle overruns.
1102         */
1103        if (FORMAT_DATA_OUT->waiting == 0) {
1104                FORMAT_DATA_OUT->txbuffer =
1105                        dag_tx_get_stream_space64(FORMAT_DATA_OUT->device->fd,
1106                                                FORMAT_DATA_OUT->dagstream,
1107                                                16908288);
1108        }
1109
1110        /*
1111         * Copy the header separately to the body, as we can't guarantee they
1112         * are in contiguous memory
1113         */
1114        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1115               (dag_record_size + pad));
1116        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1117
1118        /*
1119         * Copy our incoming packet into the outgoing buffer, and increment
1120         * our waiting count
1121         */
1122        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1123        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1124               size);
1125        FORMAT_DATA_OUT->waiting += size;
1126
1127        /*
1128         * If our output buffer has more than 16 Mebibytes in it, commit those
1129         * bytes and reset the waiting count to 0.
1130         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1131         * case there is still data in the buffer at program exit.
1132         */
1133        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1134                FORMAT_DATA_OUT->txbuffer =
1135                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1136                                                   FORMAT_DATA_OUT->dagstream,
1137                                                   FORMAT_DATA_OUT->waiting);
1138                FORMAT_DATA_OUT->waiting = 0;
1139        }
1140
1141        return size + pad + dag_record_size;
1142}
1143
1144/* Attempts to determine a suitable ERF type for a given packet. Returns true
1145 * if one is found, false otherwise */
1146static bool find_compatible_linktype(libtrace_out_t *libtrace,
1147                                     libtrace_packet_t *packet, char *type)
1148{
1149        /* Keep trying to simplify the packet until we can find
1150         * something we can do with it */
1151
1152        do {
1153                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1154
1155                /* Success */
1156                if (*type != (char)-1)
1157                        return true;
1158
1159                if (!demote_packet(packet)) {
1160                        trace_set_err_out(libtrace,
1161                                          TRACE_ERR_NO_CONVERSION,
1162                                          "No erf type for packet (%i)",
1163                                          trace_get_link_type(packet));
1164                        return false;
1165                }
1166
1167        } while(1);
1168
1169        return true;
1170}
1171
1172/* Writes a packet to the provided DAG output trace */
1173static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1174{
1175        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1176         * coding sucks, sorry about that.
1177         */
1178        unsigned int pad = 0;
1179        int numbytes;
1180        void *payload = packet->payload;
1181        dag_record_t *header = (dag_record_t *)packet->header;
1182        char erf_type = 0;
1183
1184        if(!packet->header) {
1185                /* No header, probably an RT packet. Lifted from
1186                 * erf_write_packet(). */
1187                return -1;
1188        }
1189
1190        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1191                return 0;
1192
1193        pad = dag_get_padding(packet);
1194
1195        /*
1196         * If the payload is null, adjust the rlen. Discussion of this is
1197         * attached to erf_write_packet()
1198         */
1199        if (payload == NULL) {
1200                header->rlen = htons(dag_record_size + pad);
1201        }
1202
1203        if (packet->type == TRACE_RT_DATA_ERF) {
1204                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1205        } else {
1206                /* Build up a new packet header from the existing header */
1207
1208                /* Simplify the packet first - if we can't do this, break
1209                 * early */
1210                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1211                        return -1;
1212
1213                dag_record_t erfhdr;
1214
1215                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1216                payload=packet->payload;
1217                pad = dag_get_padding(packet);
1218
1219                /* Flags. Can't do this */
1220                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1221                if (trace_get_direction(packet)!=(int)~0U)
1222                        erfhdr.flags.iface = trace_get_direction(packet);
1223
1224                erfhdr.type = erf_type;
1225
1226                /* Packet length (rlen includes format overhead) */
1227                assert(trace_get_capture_length(packet) > 0
1228                       && trace_get_capture_length(packet) <= 65536);
1229                assert(erf_get_framing_length(packet) > 0
1230                       && trace_get_framing_length(packet) <= 65536);
1231                assert(trace_get_capture_length(packet) +
1232                       erf_get_framing_length(packet) > 0
1233                       && trace_get_capture_length(packet) +
1234                       erf_get_framing_length(packet) <= 65536);
1235
1236                erfhdr.rlen = htons(trace_get_capture_length(packet)
1237                                    + erf_get_framing_length(packet));
1238
1239
1240                /* Loss counter. Can't do this */
1241                erfhdr.lctr = 0;
1242                /* Wire length, does not include padding! */
1243                erfhdr.wlen = htons(trace_get_wire_length(packet));
1244
1245                /* Write it out */
1246                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1247        }
1248
1249        return numbytes;
1250}
1251
1252/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1253 *
1254 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1255 */
1256static int dag_read_packet_stream(libtrace_t *libtrace,
1257                                struct dag_per_stream_t *stream_data,
1258                                libtrace_thread_t *t, /* Optional */
1259                                libtrace_packet_t *packet)
1260{
1261        int size = 0;
1262        dag_record_t *erfptr = NULL;
1263        struct timeval tv;
1264        int numbytes = 0;
1265        uint32_t flags = 0;
1266        struct timeval maxwait, pollwait;
1267
1268        pollwait.tv_sec = 0;
1269        pollwait.tv_usec = 10000;
1270        maxwait.tv_sec = 0;
1271        maxwait.tv_usec = 250000;
1272
1273        /* Check if we're due for a DUCK report - only report on the first thread */
1274        if (stream_data == FORMAT_DATA_FIRST) {
1275                size = dag_get_duckinfo(libtrace, packet);
1276                if (size != 0)
1277                        return size;
1278        }
1279
1280
1281        /* Don't let anyone try to free our DAG memory hole! */
1282        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1283
1284        /* If the packet buffer is currently owned by libtrace, free it so
1285         * that we can set the packet to point into the DAG memory hole */
1286        if (packet->buf_control == TRACE_CTRL_PACKET) {
1287                free(packet->buffer);
1288                packet->buffer = 0;
1289        }
1290
1291        if (dag_set_stream_poll64(FORMAT_DATA->device->fd, stream_data->dagstream,
1292                                sizeof(dag_record_t), &maxwait,
1293                                &pollwait) == -1) {
1294                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1295                return -1;
1296        }
1297
1298        /* Grab a full ERF record */
1299        do {
1300                numbytes = dag_available(libtrace, stream_data);
1301                if (numbytes < 0)
1302                        return numbytes;
1303                if (numbytes < dag_record_size) {
1304                        /* Check the message queue if we have one to check */
1305                        if (t != NULL &&
1306                            libtrace_message_queue_count(&t->messages) > 0)
1307                                return -2;
1308
1309                        if ((numbytes=is_halted(libtrace)) != -1)
1310                                return numbytes;
1311                        /* Block until we see a packet */
1312                        continue;
1313                }
1314                erfptr = dag_get_record(stream_data);
1315        } while (erfptr == NULL);
1316
1317        packet->trace = libtrace;
1318
1319        /* Prepare the libtrace packet */
1320        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1321                                    TRACE_RT_DATA_ERF, flags))
1322                return -1;
1323
1324        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1325        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1326                tv = trace_get_timeval(packet);
1327                DUCK.last_pkt = tv.tv_sec;
1328        }
1329
1330        packet->order = erf_get_erf_timestamp(packet);
1331        packet->error = packet->payload ? htons(erfptr->rlen) :
1332                                          erf_get_framing_length(packet);
1333
1334        return packet->error;
1335}
1336
1337static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1338{
1339        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1340}
1341
1342static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1343                             libtrace_packet_t **packets, size_t nb_packets)
1344{
1345        int ret;
1346        size_t read_packets = 0;
1347        int numbytes = 0;
1348
1349        struct dag_per_stream_t *stream_data =
1350                (struct dag_per_stream_t *)t->format_data;
1351
1352        /* Read as many packets as we can, but read atleast one packet */
1353        do {
1354                ret = dag_read_packet_stream(libtrace, stream_data, t,
1355                                           packets[read_packets]);
1356                if (ret < 0)
1357                        return ret;
1358
1359                read_packets++;
1360
1361                /* Make sure we don't read too many packets..! */
1362                if (read_packets >= nb_packets)
1363                        break;
1364
1365                numbytes = dag_available(libtrace, stream_data);
1366        } while (numbytes >= dag_record_size);
1367
1368        return read_packets;
1369}
1370
1371/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1372 * packet is available, we will return a packet event. Otherwise we will
1373 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1374 */
1375static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1376                                           libtrace_packet_t *packet)
1377{
1378        libtrace_eventobj_t event = {0,0,0.0,0};
1379        dag_record_t *erfptr = NULL;
1380        int numbytes;
1381        uint32_t flags = 0;
1382        struct timeval minwait, tv;
1383       
1384        minwait.tv_sec = 0;
1385        minwait.tv_usec = 10000;
1386
1387        /* Check if we're meant to provide a DUCK update */
1388        numbytes = dag_get_duckinfo(libtrace, packet);
1389        if (numbytes < 0) {
1390                event.type = TRACE_EVENT_TERMINATE;
1391                return event;
1392        } else if (numbytes > 0) {
1393                event.type = TRACE_EVENT_PACKET;
1394                return event;
1395        }
1396       
1397        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
1398                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1399                                &minwait) == -1) {
1400                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1401                event.type = TRACE_EVENT_TERMINATE;
1402                return event;
1403        }
1404
1405        do {
1406                erfptr = NULL;
1407                numbytes = 0;
1408
1409                /* Need to call dag_available so that the top pointer will get
1410                 * updated, otherwise we'll never see any data! */
1411                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1412
1413                /* May as well not bother calling dag_get_record if
1414                 * dag_available suggests that there's no data */
1415                if (numbytes != 0)
1416                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1417                if (erfptr == NULL) {
1418                        /* No packet available - sleep for a very short time */
1419                        if (libtrace_halt) {
1420                                event.type = TRACE_EVENT_TERMINATE;
1421                        } else {
1422                                event.type = TRACE_EVENT_SLEEP;
1423                                event.seconds = 0.0001;
1424                        }
1425                        break;
1426                }
1427                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1428                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1429                        event.type = TRACE_EVENT_TERMINATE;
1430                        break;
1431                }
1432
1433
1434                event.size = trace_get_capture_length(packet) +
1435                        trace_get_framing_length(packet);
1436
1437                /* XXX trace_read_packet() normally applies the following
1438                 * config options for us, but this function is called via
1439                 * trace_event() so we have to do it ourselves */
1440
1441                if (libtrace->filter) {
1442                        int filtret = trace_apply_filter(libtrace->filter,
1443                                                         packet);
1444                        if (filtret == -1) {
1445                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1446                                              "Bad BPF Filter");
1447                                event.type = TRACE_EVENT_TERMINATE;
1448                                break;
1449                        }
1450
1451                        if (filtret == 0) {
1452                                /* This packet isn't useful so we want to
1453                                 * immediately see if there is another suitable
1454                                 * one - we definitely DO NOT want to return
1455                                 * a sleep event in this case, like we used to
1456                                 * do! */
1457                                libtrace->filtered_packets ++;
1458                                trace_clear_cache(packet);
1459                                continue;
1460                        }
1461
1462                        event.type = TRACE_EVENT_PACKET;
1463                } else {
1464                        event.type = TRACE_EVENT_PACKET;
1465                }
1466
1467                /* Update the DUCK timer */
1468                tv = trace_get_timeval(packet);
1469                DUCK.last_pkt = tv.tv_sec;
1470               
1471                if (libtrace->snaplen > 0) {
1472                        trace_set_capture_length(packet, libtrace->snaplen);
1473                }
1474                libtrace->accepted_packets ++;
1475                break;
1476        } while(1);
1477
1478        return event;
1479}
1480
1481static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1482{
1483        libtrace_list_node_t *tmp;
1484        assert(stat && libtrace);
1485        tmp = FORMAT_DATA_HEAD;
1486
1487        /* Dropped packets */
1488        stat->dropped_valid = 1;
1489        stat->dropped = 0;
1490        while (tmp != NULL) {
1491                stat->dropped += STREAM_DATA(tmp)->drops;
1492                tmp = tmp->next;
1493        }
1494
1495}
1496
1497static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1498                                       libtrace_stat_t *stat) {
1499        struct dag_per_stream_t *stream_data = t->format_data;
1500        assert(stat && libtrace);
1501
1502        stat->dropped_valid = 1;
1503        stat->dropped = stream_data->drops;
1504
1505        stat->filtered_valid = 1;
1506        stat->filtered = 0;
1507}
1508
1509/* Prints some semi-useful help text about the DAG format module */
1510static void dag_help(void) {
1511        printf("dag format module: $Revision: 1755 $\n");
1512        printf("Supported input URIs:\n");
1513        printf("\tdag:/dev/dagn\n");
1514        printf("\n");
1515        printf("\te.g.: dag:/dev/dag0\n");
1516        printf("\n");
1517        printf("Supported output URIs:\n");
1518        printf("\tnone\n");
1519        printf("\n");
1520}
1521
1522static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1523                                bool reader)
1524{
1525        struct dag_per_stream_t *stream_data;
1526        libtrace_list_node_t *node;
1527
1528        if (reader && t->type == THREAD_PERPKT) {
1529                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1530                                                t->perpkt_num);
1531                if (node == NULL) {
1532                        return -1;
1533                }
1534                stream_data = node->data;
1535
1536                /* Pass the per thread data to the thread */
1537                t->format_data = stream_data;
1538
1539                /* Attach and start the DAG stream */
1540                if (dag_start_input_stream(libtrace, stream_data) < 0)
1541                        return -1;
1542        }
1543
1544        return 0;
1545}
1546
1547static struct libtrace_format_t dag = {
1548        "dag",
1549        "$Id$",
1550        TRACE_FORMAT_ERF,
1551        dag_probe_filename,             /* probe filename */
1552        NULL,                           /* probe magic */
1553        dag_init_input,                 /* init_input */
1554        dag_config_input,               /* config_input */
1555        dag_start_input,                /* start_input */
1556        dag_pause_input,                /* pause_input */
1557        dag_init_output,                /* init_output */
1558        NULL,                           /* config_output */
1559        dag_start_output,               /* start_output */
1560        dag_fin_input,                  /* fin_input */
1561        dag_fin_output,                 /* fin_output */
1562        dag_read_packet,                /* read_packet */
1563        dag_prepare_packet,             /* prepare_packet */
1564        NULL,                           /* fin_packet */
1565        dag_write_packet,               /* write_packet */
1566        erf_get_link_type,              /* get_link_type */
1567        erf_get_direction,              /* get_direction */
1568        erf_set_direction,              /* set_direction */
1569        erf_get_erf_timestamp,          /* get_erf_timestamp */
1570        NULL,                           /* get_timeval */
1571        NULL,                           /* get_seconds */
1572        NULL,                           /* get_timespec */
1573        NULL,                           /* seek_erf */
1574        NULL,                           /* seek_timeval */
1575        NULL,                           /* seek_seconds */
1576        erf_get_capture_length,         /* get_capture_length */
1577        erf_get_wire_length,            /* get_wire_length */
1578        erf_get_framing_length,         /* get_framing_length */
1579        erf_set_capture_length,         /* set_capture_length */
1580        NULL,                           /* get_received_packets */
1581        NULL,                           /* get_filtered_packets */
1582        NULL,                           /* get_dropped_packets */
1583        dag_get_statistics,             /* get_statistics */
1584        NULL,                           /* get_fd */
1585        trace_event_dag,                /* trace_event */
1586        dag_help,                       /* help */
1587        NULL,                            /* next pointer */
1588        {true, 0}, /* live packet capture, thread limit TBD */
1589        dag_pstart_input,
1590        dag_pread_packets,
1591        dag_pause_input,
1592        NULL,
1593        dag_pregister_thread,
1594        NULL,
1595        dag_get_thread_statistics       /* get thread stats */
1596};
1597
1598void dag_constructor(void)
1599{
1600        register_format(&dag);
1601}
Note: See TracBrowser for help on using the repository browser.