source: lib/format_dag25.c @ 733c8b4

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 733c8b4 was d8b05b7, checked in by Shane Alcock <salcock@…>, 6 years ago

Make sure our copyright covers recent years

Consistency across all of our source files is also nice.

  • Property mode set to 100644
File size: 45.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef HAVE_DAG_CONFIG_API_H
58#include <dag_config_api.h>
59#endif
60
61#ifdef WIN32
62#  include <io.h>
63#  include <share.h>
64#  define PATH_MAX _MAX_PATH
65#  define snprintf sprintf_s
66#else
67#  include <netdb.h>
68#  ifndef PATH_MAX
69#       define PATH_MAX 4096
70#  endif
71#  include <sys/ioctl.h>
72#endif
73
74/* This format deals with DAG cards that are using drivers from the 2.5 version
75 * onwards, including 3.X.
76 *
77 * DAG is a LIVE capture format.
78 *
79 * This format does support writing, provided the DAG card that you are using
80 * has transmit (Tx) support. Additionally, packets read using this format
81 * are in the ERF format, so can easily be written as ERF traces without
82 * losing any data.
83 */
84
85#define DATA(x) ((struct dag_format_data_t *)x->format_data)
86#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
87#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
88
89#define FORMAT_DATA DATA(libtrace)
90#define FORMAT_DATA_OUT DATA_OUT(libtrace)
91
92#define DUCK FORMAT_DATA->duck
93
94#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
95#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
96
97static struct libtrace_format_t dag;
98
99/* A DAG device - a DAG device can support multiple streams (and therefore
100 * multiple input traces) so each trace needs to refer to a device */
101struct dag_dev_t {
102        char * dev_name;                /* Device name */
103        int fd;                         /* File descriptor */
104        uint16_t ref_count;             /* Number of input / output traces
105                                           that are using this device */
106        struct dag_dev_t *prev;         /* Pointer to the previous device in
107                                           the device list */
108        struct dag_dev_t *next;         /* Pointer to the next device in the
109                                           device list */
110};
111
112/* "Global" data that is stored for each DAG output trace */
113struct dag_format_data_out_t {
114        /* The DAG device being used for writing */
115        struct dag_dev_t *device;
116        /* The DAG stream that is being written on */
117        unsigned int dagstream;
118        /* Boolean flag indicating whether the stream is currently attached */
119        int stream_attached;
120        /* The amount of data waiting to be transmitted, in bytes */
121        uint64_t waiting;
122        /* A buffer to hold the data to be transmittted */
123        uint8_t *txbuffer;
124};
125
126/* Data that is stored against each input stream */
127struct dag_per_stream_t {
128        /* DAG stream number */
129        uint16_t dagstream;
130        /* Pointer to the last unread byte in the DAG memory */
131        uint8_t *top;
132        /* Pointer to the first unread byte in the DAG memory */
133        uint8_t *bottom;
134        /* Amount of data processed from the bottom pointer */
135        uint32_t processed;
136        /* Number of packets seen by the stream */
137        uint64_t pkt_count;
138        /* Drop count for this particular stream */
139        uint64_t drops;
140        /* Boolean values to indicate if a particular interface has been seen
141         * or not. This is limited to four interfaces, which is enough to
142         * support all current DAG cards */
143        uint8_t seeninterface[4];
144};
145
146/* "Global" data that is stored for each DAG input trace */
147struct dag_format_data_t {
148        /* DAG device */
149        struct dag_dev_t *device;
150        /* Boolean flag indicating whether the trace is currently attached */
151        int stream_attached;
152        /* Data stored against each DAG input stream */
153        libtrace_list_t *per_stream;
154
155        /* Data required for regular DUCK reporting.
156         * We put this on a new cache line otherwise we have a lot of false
157         * sharing caused by updating the last_pkt.
158         * This should only ever be accessed by the first thread stream,
159         * that includes both read and write operations.
160         */
161        struct {
162                /* Timestamp of the last DUCK report */
163                uint32_t last_duck;
164                /* The number of seconds between each DUCK report */
165                uint32_t duck_freq;
166                /* Timestamp of the last packet read from the DAG card */
167                uint32_t last_pkt;
168                /* Dummy trace to ensure DUCK packets are dealt with using the
169                 * DUCK format functions */
170                libtrace_t *dummy_duck;
171        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
172};
173
174/* To be thread-safe, we're going to need a mutex for operating on the list
175 * of DAG devices */
176pthread_mutex_t open_dag_mutex;
177
178/* The list of DAG devices that have been opened by libtrace.
179 *
180 * We can only open each DAG device once, but we might want to read from
181 * multiple streams. Therefore, we need to maintain a list of devices that we
182 * have opened (with ref counts!) so that we don't try to open a device too
183 * many times or close a device that we're still using */
184struct dag_dev_t *open_dags = NULL;
185
186/* Returns the amount of padding between the ERF header and the start of the
187 * captured packet data */
188static int dag_get_padding(const libtrace_packet_t *packet)
189{
190        /* ERF Ethernet records have a 2 byte padding before the packet itself
191         * so that the IP header is aligned on a 32 bit boundary.
192         */
193        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
194                dag_record_t *erfptr = (dag_record_t *)packet->header;
195                switch(erfptr->type) {
196                        case TYPE_ETH:
197                        case TYPE_DSM_COLOR_ETH:
198                                return 2;
199                        default:                return 0;
200                }
201        }
202        else {
203                switch(trace_get_link_type(packet)) {
204                        case TRACE_TYPE_ETH:    return 2;
205                        default:                return 0;
206                }
207        }
208}
209
210/* Attempts to determine if the given filename refers to a DAG device */
211static int dag_probe_filename(const char *filename)
212{
213        struct stat statbuf;
214        /* Can we stat the file? */
215        if (stat(filename, &statbuf) != 0) {
216                return 0;
217        }
218        /* Is it a character device? */
219        if (!S_ISCHR(statbuf.st_mode)) {
220                return 0;
221        }
222        /* Yeah, it's probably us. */
223        return 1;
224}
225
226/* Initialises the DAG output data structure */
227static void dag_init_format_out_data(libtrace_out_t *libtrace)
228{
229        libtrace->format_data = (struct dag_format_data_out_t *)
230                malloc(sizeof(struct dag_format_data_out_t));
231        // no DUCK on output
232        FORMAT_DATA_OUT->stream_attached = 0;
233        FORMAT_DATA_OUT->device = NULL;
234        FORMAT_DATA_OUT->dagstream = 0;
235        FORMAT_DATA_OUT->waiting = 0;
236
237}
238
239/* Initialises the DAG input data structure */
240static void dag_init_format_data(libtrace_t *libtrace)
241{
242        struct dag_per_stream_t stream_data;
243
244        libtrace->format_data = (struct dag_format_data_t *)
245                malloc(sizeof(struct dag_format_data_t));
246        DUCK.last_duck = 0;
247        DUCK.duck_freq = 0;
248        DUCK.last_pkt = 0;
249        DUCK.dummy_duck = NULL;
250
251        FORMAT_DATA->per_stream =
252                libtrace_list_init(sizeof(stream_data));
253        assert(FORMAT_DATA->per_stream != NULL);
254
255        /* We'll start with just one instance of stream_data, and we'll
256         * add more later if we need them */
257        memset(&stream_data, 0, sizeof(stream_data));
258        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
259}
260
261/* Determines if there is already an entry for the given DAG device in the
262 * device list and increments the reference count for that device, if found.
263 *
264 * NOTE: This function assumes the open_dag_mutex is held by the caller */
265static struct dag_dev_t *dag_find_open_device(char *dev_name)
266{
267        struct dag_dev_t *dag_dev;
268
269        dag_dev = open_dags;
270
271        /* XXX: Not exactly zippy, but how often are we going to be dealing
272         * with multiple dag cards? */
273        while (dag_dev != NULL) {
274                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
275                        dag_dev->ref_count ++;
276                        return dag_dev;
277                }
278                dag_dev = dag_dev->next;
279        }
280        return NULL;
281}
282
283/* Closes a DAG device and removes it from the device list.
284 *
285 * Attempting to close a DAG device that has a non-zero reference count will
286 * cause an assertion failure!
287 *
288 * NOTE: This function assumes the open_dag_mutex is held by the caller */
289static void dag_close_device(struct dag_dev_t *dev)
290{
291        /* Need to remove from the device list */
292        assert(dev->ref_count == 0);
293
294        if (dev->prev == NULL) {
295                open_dags = dev->next;
296                if (dev->next)
297                        dev->next->prev = NULL;
298        } else {
299                dev->prev->next = dev->next;
300                if (dev->next)
301                        dev->next->prev = dev->prev;
302        }
303
304        dag_close(dev->fd);
305        free(dev);
306}
307
308
309/* Opens a new DAG device for writing and adds it to the DAG device list
310 *
311 * NOTE: this function should only be called when opening a DAG device for
312 * writing - there is little practical difference between this and the
313 * function below that covers the reading case, but we need the output trace
314 * object to report errors properly so the two functions take slightly
315 * different arguments. This is really lame and there should be a much better
316 * way of doing this.
317 *
318 * NOTE: This function assumes the open_dag_mutex is held by the caller
319 */
320static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
321                                                char *dev_name)
322{
323        struct stat buf;
324        int fd;
325        struct dag_dev_t *new_dev;
326
327        /* Make sure the device exists */
328        if (stat(dev_name, &buf) == -1) {
329                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
330                return NULL;
331        }
332
333        /* Make sure it is the appropriate type of device */
334        if (S_ISCHR(buf.st_mode)) {
335                /* Try opening the DAG device */
336                if((fd = dag_open(dev_name)) < 0) {
337                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
338                                        dev_name);
339                        return NULL;
340                }
341        } else {
342                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
343                                dev_name);
344                return NULL;
345        }
346
347        /* Add the device to our device list - it is just a doubly linked
348         * list with no inherent ordering; just tack the new one on the front
349         */
350        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
351        new_dev->fd = fd;
352        new_dev->dev_name = dev_name;
353        new_dev->ref_count = 1;
354
355        new_dev->prev = NULL;
356        new_dev->next = open_dags;
357        if (open_dags)
358                open_dags->prev = new_dev;
359
360        open_dags = new_dev;
361
362        return new_dev;
363}
364
365/* Opens a new DAG device for reading and adds it to the DAG device list
366 *
367 * NOTE: this function should only be called when opening a DAG device for
368 * reading - there is little practical difference between this and the
369 * function above that covers the writing case, but we need the input trace
370 * object to report errors properly so the two functions take slightly
371 * different arguments. This is really lame and there should be a much better
372 * way of doing this.
373 *
374 * NOTE: This function assumes the open_dag_mutex is held by the caller */
375static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
376        struct stat buf;
377        int fd;
378        struct dag_dev_t *new_dev;
379
380        /* Make sure the device exists */
381        if (stat(dev_name, &buf) == -1) {
382                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
383                return NULL;
384        }
385
386        /* Make sure it is the appropriate type of device */
387        if (S_ISCHR(buf.st_mode)) {
388                /* Try opening the DAG device */
389                if((fd = dag_open(dev_name)) < 0) {
390                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
391                                      dev_name);
392                        return NULL;
393                }
394        } else {
395                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
396                              dev_name);
397                return NULL;
398        }
399
400        /* Add the device to our device list - it is just a doubly linked
401         * list with no inherent ordering; just tack the new one on the front
402         */
403        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
404        new_dev->fd = fd;
405        new_dev->dev_name = dev_name;
406        new_dev->ref_count = 1;
407
408        new_dev->prev = NULL;
409        new_dev->next = open_dags;
410        if (open_dags)
411                open_dags->prev = new_dev;
412
413        open_dags = new_dev;
414
415        return new_dev;
416}
417
418/* Creates and initialises a DAG output trace */
419static int dag_init_output(libtrace_out_t *libtrace)
420{
421        /* Upon successful creation, the device name is stored against the
422         * device and free when it is free()d */
423        char *dag_dev_name = NULL;
424        char *scan = NULL;
425        struct dag_dev_t *dag_device = NULL;
426        int stream = 1;
427
428        /* XXX I don't know if this is important or not, but this function
429         * isn't present in all of the driver releases that this code is
430         * supposed to support! */
431        /*
432        unsigned long wake_time;
433        dagutil_sleep_get_wake_time(&wake_time,0);
434        */
435
436        dag_init_format_out_data(libtrace);
437        /* Grab the mutex while we're likely to be messing with the device
438         * list */
439        pthread_mutex_lock(&open_dag_mutex);
440
441        /* Specific streams are signified using a comma in the libtrace URI,
442         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
443         *
444         * If no stream is specified, we will write using stream 1 */
445        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
446                dag_dev_name = strdup(libtrace->uridata);
447        } else {
448                dag_dev_name = (char *)strndup(libtrace->uridata,
449                                (size_t)(scan - libtrace->uridata));
450                stream = atoi(++scan);
451        }
452        FORMAT_DATA_OUT->dagstream = stream;
453
454        /* See if our DAG device is already open */
455        dag_device = dag_find_open_device(dag_dev_name);
456
457        if (dag_device == NULL) {
458                /* Device not yet opened - open it ourselves */
459                dag_device = dag_open_output_device(libtrace, dag_dev_name);
460        } else {
461                /* Otherwise, just use the existing one */
462                free(dag_dev_name);
463                dag_dev_name = NULL;
464        }
465
466        /* Make sure we have successfully opened a DAG device */
467        if (dag_device == NULL) {
468                if (dag_dev_name) {
469                        free(dag_dev_name);
470                }
471                pthread_mutex_unlock(&open_dag_mutex);
472                return -1;
473        }
474
475        FORMAT_DATA_OUT->device = dag_device;
476        pthread_mutex_unlock(&open_dag_mutex);
477        return 0;
478}
479
480/* Creates and initialises a DAG input trace */
481static int dag_init_input(libtrace_t *libtrace) {
482        /* Upon successful creation, the device name is stored against the
483         * device and free when it is free()d */
484        char *dag_dev_name = NULL;
485        char *scan = NULL;
486        int stream = 0;
487        struct dag_dev_t *dag_device = NULL;
488
489        dag_init_format_data(libtrace);
490        /* Grab the mutex while we're likely to be messing with the device
491         * list */
492        pthread_mutex_lock(&open_dag_mutex);
493
494
495        /* DAG cards support multiple streams. In a single threaded capture,
496         * these are specified using a comma in the libtrace URI,
497         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
498         *
499         * If no stream is specified, we will read from stream 0 with
500         * one thread
501         */
502        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
503                dag_dev_name = strdup(libtrace->uridata);
504        } else {
505                dag_dev_name = (char *)strndup(libtrace->uridata,
506                                (size_t)(scan - libtrace->uridata));
507                stream = atoi(++scan);
508        }
509
510        FORMAT_DATA_FIRST->dagstream = stream;
511
512        /* See if our DAG device is already open */
513        dag_device = dag_find_open_device(dag_dev_name);
514
515        if (dag_device == NULL) {
516                /* Device not yet opened - open it ourselves */
517                dag_device = dag_open_device(libtrace, dag_dev_name);
518        } else {
519                /* Otherwise, just use the existing one */
520                free(dag_dev_name);
521                dag_dev_name = NULL;
522        }
523
524        /* Make sure we have successfully opened a DAG device */
525        if (dag_device == NULL) {
526                if (dag_dev_name)
527                        free(dag_dev_name);
528                dag_dev_name = NULL;
529                pthread_mutex_unlock(&open_dag_mutex);
530                return -1;
531        }
532
533        FORMAT_DATA->device = dag_device;
534
535        /* See Config_Status_API_Programming_Guide.pdf from the Endace
536           Dag Documentation */
537        /* Check kBooleanAttributeActive is true -- no point capturing
538         * on an interface that's disabled
539         *
540         * The symptom of the port being disabled is that libtrace
541         * will appear to hang. */
542        /* Check kBooleanAttributeFault is false */
543        /* Check kBooleanAttributeLocalFault is false */
544        /* Check kBooleanAttributeLock is true ? */
545        /* Check kBooleanAttributePeerLink ? */
546
547        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
548           on libtrace promisc attribute?*/
549        /* Set kUint32AttributeSnapLength to the snaplength */
550
551        pthread_mutex_unlock(&open_dag_mutex);
552        return 0;
553}
554
555#ifdef HAVE_DAG_CONFIG_API_H
556static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
557        dag_card_ref_t card_ref = NULL;
558        dag_component_t root = NULL;
559        attr_uuid_t uuid = 0;
560
561        if (slen < 0)
562                slen = 0; 
563
564        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
565        root = dag_config_get_root_component(card_ref);
566       
567        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
568        dag_config_set_boolean_attribute(card_ref, uuid, true);
569
570        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
571        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
572
573        return 0;
574       
575
576} 
577#endif /* HAVE_DAG_CONFIG_API_H */
578
579/* Configures a DAG input trace */
580static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
581                            void *data)
582{
583        switch(option) {
584        case TRACE_OPTION_META_FREQ:
585                /* This option is used to specify the frequency of DUCK
586                 * updates */
587                DUCK.duck_freq = *(int *)data;
588                return 0;
589        case TRACE_OPTION_SNAPLEN:
590#ifdef HAVE_DAG_CONFIG_API_H
591                return dag_csapi_set_snaplen(libtrace, *(int *)data);
592#else
593                /* Tell the card our new snap length */
594        {
595                char conf_str[4096];
596                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
597                if (dag_configure(FORMAT_DATA->device->fd,
598                                  conf_str) != 0) {
599                        trace_set_err(libtrace, errno, "Failed to configure "
600                                      "snaplen on DAG card: %s",
601                                      libtrace->uridata);
602                        return -1;
603                }
604        }
605#endif /* HAVE_DAG_CONFIG_API_H */
606
607                return 0;
608        case TRACE_OPTION_PROMISC:
609                /* DAG already operates in a promisc fashion */
610                return -1;
611        case TRACE_OPTION_FILTER:
612                /* We don't yet support pushing filters into DAG
613                 * cards */
614                return -1;
615        case TRACE_OPTION_EVENT_REALTIME:
616                /* Live capture is always going to be realtime */
617                return -1;
618        case TRACE_OPTION_HASHER:
619                /* Lets just say we did this, it's currently still up to
620                 * the user to configure this correctly. */
621                return 0;
622        }
623        return -1;
624}
625
626/* Starts a DAG output trace */
627static int dag_start_output(libtrace_out_t *libtrace)
628{
629        struct timeval zero, nopoll;
630
631        zero.tv_sec = 0;
632        zero.tv_usec = 0;
633        nopoll = zero;
634
635        /* Attach and start the DAG stream */
636        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
637                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
638                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
639                return -1;
640        }
641
642        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
643                        FORMAT_DATA_OUT->dagstream) < 0) {
644                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
645                return -1;
646        }
647        FORMAT_DATA_OUT->stream_attached = 1;
648
649        /* We don't want the dag card to do any sleeping */
650        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
651                        FORMAT_DATA_OUT->dagstream, 0, &zero,
652                        &nopoll);
653
654        return 0;
655}
656
657static int dag_start_input_stream(libtrace_t *libtrace,
658                                  struct dag_per_stream_t * stream) {
659        struct timeval zero, nopoll;
660        uint8_t *top, *bottom, *starttop;
661        top = bottom = NULL;
662
663        zero.tv_sec = 0;
664        zero.tv_usec = 10000;
665        nopoll = zero;
666
667        /* Attach and start the DAG stream */
668        if (dag_attach_stream(FORMAT_DATA->device->fd,
669                              stream->dagstream, 0, 0) < 0) {
670                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
671                              stream->dagstream);
672                return -1;
673        }
674
675        if (dag_start_stream(FORMAT_DATA->device->fd,
676                             stream->dagstream) < 0) {
677                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
678                              stream->dagstream);
679                return -1;
680        }
681        FORMAT_DATA->stream_attached = 1;
682
683        /* We don't want the dag card to do any sleeping */
684        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
685                            stream->dagstream, 0, &zero,
686                            &nopoll) < 0) {
687                trace_set_err(libtrace, errno,
688                              "dag_set_stream_poll failed!");
689                return -1;
690        }
691
692        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
693                                      stream->dagstream,
694                                      &bottom);
695
696        /* Should probably flush the memory hole now */
697        top = starttop;
698        while (starttop - bottom > 0) {
699                bottom += (starttop - bottom);
700                top = dag_advance_stream(FORMAT_DATA->device->fd,
701                                         stream->dagstream,
702                                         &bottom);
703        }
704        stream->top = top;
705        stream->bottom = bottom;
706        stream->processed = 0;
707        stream->drops = 0;
708
709        return 0;
710
711}
712
713/* Starts a DAG input trace */
714static int dag_start_input(libtrace_t *libtrace)
715{
716        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
717}
718
719static int dag_pstart_input(libtrace_t *libtrace)
720{
721        char *scan, *tok;
722        uint16_t stream_count = 0, max_streams;
723        int iserror = 0;
724        struct dag_per_stream_t stream_data;
725
726        /* Check we aren't trying to create more threads than the DAG card can
727         * handle */
728        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
729        if (libtrace->perpkt_thread_count > max_streams) {
730                fprintf(stderr,
731                              "WARNING: DAG has only %u streams available, "
732                              "capping total number of threads at this value.",
733                              max_streams);
734                libtrace->perpkt_thread_count = max_streams;
735        }
736
737        /* Get the stream names from the uri */
738        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
739                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
740                              "Format uri doesn't specify the DAG streams");
741                iserror = 1;
742                goto cleanup;
743        }
744
745        scan++;
746
747        tok = strtok(scan, ",");
748        while (tok != NULL) {
749                /* Ensure we haven't specified too many streams */
750                if (stream_count >= libtrace->perpkt_thread_count) {
751                        fprintf(stderr,
752                                      "WARNING: Format uri specifies too many "
753                                      "streams. Maximum is %u, so only using "
754                                      "the first %u from the uri.",
755                                      libtrace->perpkt_thread_count, 
756                                      libtrace->perpkt_thread_count);
757                        break;
758                }
759
760                /* Save the stream details */
761                if (stream_count == 0) {
762                        /* Special case where we update the existing stream
763                         * data structure */
764                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
765                } else {
766                        memset(&stream_data, 0, sizeof(stream_data));
767                        stream_data.dagstream = (uint16_t)atoi(tok);
768                        libtrace_list_push_back(FORMAT_DATA->per_stream,
769                                                &stream_data);
770                }
771
772                stream_count++;
773                tok = strtok(NULL, ",");
774        }
775
776        if (stream_count < libtrace->perpkt_thread_count) {
777                libtrace->perpkt_thread_count = stream_count;
778        }
779       
780        FORMAT_DATA->stream_attached = 1;
781
782 cleanup:
783        if (iserror) {
784                return -1;
785        } else {
786                return 0;
787        }
788}
789
790/* Pauses a DAG output trace */
791static int dag_pause_output(libtrace_out_t *libtrace)
792{
793        /* Stop and detach the stream */
794        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
795                            FORMAT_DATA_OUT->dagstream) < 0) {
796                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
797                return -1;
798        }
799        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
800                              FORMAT_DATA_OUT->dagstream) < 0) {
801                trace_set_err_out(libtrace, errno,
802                                  "Could not detach DAG stream");
803                return -1;
804        }
805        FORMAT_DATA_OUT->stream_attached = 0;
806        return 0;
807}
808
809/* Pauses a DAG input trace */
810static int dag_pause_input(libtrace_t *libtrace)
811{
812        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
813
814        /* Stop and detach each stream */
815        while (tmp != NULL) {
816                if (dag_stop_stream(FORMAT_DATA->device->fd,
817                                    STREAM_DATA(tmp)->dagstream) < 0) {
818                        trace_set_err(libtrace, errno,
819                                      "Could not stop DAG stream");
820                        return -1;
821                }
822                if (dag_detach_stream(FORMAT_DATA->device->fd,
823                                      STREAM_DATA(tmp)->dagstream) < 0) {
824                        trace_set_err(libtrace, errno,
825                                      "Could not detach DAG stream");
826                        return -1;
827                }
828
829                tmp = tmp->next;
830        }
831
832        FORMAT_DATA->stream_attached = 0;
833        return 0;
834}
835
836
837
838/* Closes a DAG input trace */
839static int dag_fin_input(libtrace_t *libtrace)
840{
841        /* Need the lock, since we're going to be handling the device list */
842        pthread_mutex_lock(&open_dag_mutex);
843
844        /* Detach the stream if we are not paused */
845        if (FORMAT_DATA->stream_attached)
846                dag_pause_input(libtrace);
847        FORMAT_DATA->device->ref_count--;
848
849        /* Close the DAG device if there are no more references to it */
850        if (FORMAT_DATA->device->ref_count == 0)
851                dag_close_device(FORMAT_DATA->device);
852
853        if (DUCK.dummy_duck)
854                trace_destroy_dead(DUCK.dummy_duck);
855
856        /* Clear the list */
857        libtrace_list_deinit(FORMAT_DATA->per_stream);
858        free(libtrace->format_data);
859        pthread_mutex_unlock(&open_dag_mutex);
860        return 0; /* success */
861}
862
863/* Closes a DAG output trace */
864static int dag_fin_output(libtrace_out_t *libtrace)
865{
866
867        /* Commit any outstanding traffic in the txbuffer */
868        if (FORMAT_DATA_OUT->waiting) {
869                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
870                                           FORMAT_DATA_OUT->dagstream,
871                                           FORMAT_DATA_OUT->waiting );
872        }
873
874        /* Wait until the buffer is nearly clear before exiting the program,
875         * as we will lose packets otherwise */
876        dag_tx_get_stream_space
877                (FORMAT_DATA_OUT->device->fd,
878                 FORMAT_DATA_OUT->dagstream,
879                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
880                                            FORMAT_DATA_OUT->dagstream) - 8);
881
882        /* Need the lock, since we're going to be handling the device list */
883        pthread_mutex_lock(&open_dag_mutex);
884
885        /* Detach the stream if we are not paused */
886        if (FORMAT_DATA_OUT->stream_attached)
887                dag_pause_output(libtrace);
888        FORMAT_DATA_OUT->device->ref_count --;
889
890        /* Close the DAG device if there are no more references to it */
891        if (FORMAT_DATA_OUT->device->ref_count == 0)
892                dag_close_device(FORMAT_DATA_OUT->device);
893        free(libtrace->format_data);
894        pthread_mutex_unlock(&open_dag_mutex);
895        return 0; /* success */
896}
897
898#ifdef DAGIOC_CARD_DUCK
899#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
900#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
901#else
902#ifdef DAGIOCDUCK
903#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
904#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
905#else
906#warning "DAG appears to be missing DUCK support"
907#endif
908#endif
909
910/* Extracts DUCK information from the DAG card and produces a DUCK packet */
911static int dag_get_duckinfo(libtrace_t *libtrace,
912                                libtrace_packet_t *packet) {
913
914        if (DUCK.duck_freq == 0)
915                return 0;
916
917#ifndef LIBTRACE_DUCK_IOCTL
918        trace_set_err(libtrace, errno, 
919                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
920        DUCK.duck_freq = 0;
921        return -1;
922#endif
923
924        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
925                return 0;
926
927        /* Allocate memory for the DUCK data */
928        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
929            !packet->buffer) {
930                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
931                packet->buf_control = TRACE_CTRL_PACKET;
932                if (!packet->buffer) {
933                        trace_set_err(libtrace, errno,
934                                      "Cannot allocate packet buffer");
935                        return -1;
936                }
937        }
938
939        /* DUCK doesn't have a format header */
940        packet->header = 0;
941        packet->payload = packet->buffer;
942
943        /* No need to check if we can get DUCK or not - we're modern
944         * enough so just grab the DUCK info */
945        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
946                   (duckinf_t *)packet->payload) < 0)) {
947                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
948                DUCK.duck_freq = 0;
949                return -1;
950        }
951
952        packet->type = LIBTRACE_DUCK_VERSION;
953
954        /* Set the packet's trace to point at a DUCK trace, so that the
955         * DUCK format functions will be called on the packet rather than the
956         * DAG ones */
957        if (!DUCK.dummy_duck)
958                DUCK.dummy_duck = trace_create_dead("duck:dummy");
959        packet->trace = DUCK.dummy_duck;
960        DUCK.last_duck = DUCK.last_pkt;
961        packet->error = sizeof(duckinf_t);
962        return sizeof(duckinf_t);
963}
964
965/* Determines the amount of data available to read from the DAG card */
966static int dag_available(libtrace_t *libtrace,
967                         struct dag_per_stream_t *stream_data)
968{
969        uint32_t diff = stream_data->top - stream_data->bottom;
970
971        /* If we've processed more than 4MB of data since we last called
972         * dag_advance_stream, then we should call it again to allow the
973         * space occupied by that 4MB to be released */
974        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
975                return diff;
976
977        /* Update the top and bottom pointers */
978        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
979                                              stream_data->dagstream,
980                                              &(stream_data->bottom));
981
982        if (stream_data->top == NULL) {
983                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
984                return -1;
985        }
986        stream_data->processed = 0;
987        diff = stream_data->top - stream_data->bottom;
988        return diff;
989}
990
991/* Returns a pointer to the start of the next complete ERF record */
992static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
993{
994        dag_record_t *erfptr = NULL;
995        uint16_t size;
996
997        erfptr = (dag_record_t *)stream_data->bottom;
998        if (!erfptr)
999                return NULL;
1000
1001        size = ntohs(erfptr->rlen);
1002        assert( size >= dag_record_size );
1003
1004        /* Make certain we have the full packet available */
1005        if (size > (stream_data->top - stream_data->bottom))
1006                return NULL;
1007
1008        stream_data->bottom += size;
1009        stream_data->processed += size;
1010        return erfptr;
1011}
1012
1013/* Converts a buffer containing a recently read DAG packet record into a
1014 * libtrace packet */
1015static int dag_prepare_packet_stream(libtrace_t *libtrace,
1016                                     struct dag_per_stream_t *stream_data,
1017                                     libtrace_packet_t *packet,
1018                                     void *buffer, libtrace_rt_types_t rt_type,
1019                                     uint32_t flags)
1020{
1021        dag_record_t *erfptr;
1022
1023        /* If the packet previously owned a buffer that is not the buffer
1024         * that contains the new packet data, we're going to need to free the
1025         * old one to avoid memory leaks */
1026        if (packet->buffer != buffer &&
1027            packet->buf_control == TRACE_CTRL_PACKET) {
1028                free(packet->buffer);
1029        }
1030
1031        /* Set the buffer owner appropriately */
1032        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1033                packet->buf_control = TRACE_CTRL_PACKET;
1034        } else
1035                packet->buf_control = TRACE_CTRL_EXTERNAL;
1036
1037        /* Update the packet pointers and type appropriately */
1038        erfptr = (dag_record_t *)buffer;
1039        packet->buffer = erfptr;
1040        packet->header = erfptr;
1041        packet->type = rt_type;
1042
1043        if (erfptr->flags.rxerror == 1) {
1044                /* rxerror means the payload is corrupt - drop the payload
1045                 * by tweaking rlen */
1046                packet->payload = NULL;
1047                erfptr->rlen = htons(erf_get_framing_length(packet));
1048        } else {
1049                packet->payload = (char*)packet->buffer
1050                        + erf_get_framing_length(packet);
1051        }
1052
1053        if (libtrace->format_data == NULL) {
1054                dag_init_format_data(libtrace);
1055        }
1056
1057        /* Update the dropped packets counter */
1058        /* No loss counter for DSM coloured records - have to use some
1059         * other API */
1060        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
1061                /* TODO */
1062        } else {
1063                /* Use the ERF loss counter */
1064                if (stream_data->seeninterface[erfptr->flags.iface]
1065                    == 0) {
1066                        stream_data->seeninterface[erfptr->flags.iface]
1067                                = 1;
1068                } else {
1069                        stream_data->drops += ntohs(erfptr->lctr);
1070                }
1071        }
1072
1073        return 0;
1074}
1075
1076static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1077                              void *buffer, libtrace_rt_types_t rt_type,
1078                              uint32_t flags)
1079{
1080        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1081                                       buffer, rt_type, flags);
1082}
1083
1084/*
1085 * dag_write_packet() at this stage attempts to improve tx performance
1086 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1087 * has been met. I observed approximately 270% performance increase
1088 * through this relatively naive tweak. No optimisation of buffer sizes
1089 * was attempted.
1090 */
1091
1092/* Pushes an ERF record onto the transmit stream */
1093static int dag_dump_packet(libtrace_out_t *libtrace,
1094                           dag_record_t *erfptr, unsigned int pad,
1095                           void *buffer)
1096{
1097        int size;
1098
1099        /*
1100         * If we've got 0 bytes waiting in the txqueue, assume that we
1101         * haven't requested any space yet, and request some, storing
1102         * the pointer at FORMAT_DATA_OUT->txbuffer.
1103         *
1104         * The amount to request is slightly magical at the moment - it's
1105         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1106         * the buffer and handle overruns.
1107         */
1108        if (FORMAT_DATA_OUT->waiting == 0) {
1109                FORMAT_DATA_OUT->txbuffer =
1110                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1111                                                FORMAT_DATA_OUT->dagstream,
1112                                                16908288);
1113        }
1114
1115        /*
1116         * Copy the header separately to the body, as we can't guarantee they
1117         * are in contiguous memory
1118         */
1119        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1120               (dag_record_size + pad));
1121        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1122
1123        /*
1124         * Copy our incoming packet into the outgoing buffer, and increment
1125         * our waiting count
1126         */
1127        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1128        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1129               size);
1130        FORMAT_DATA_OUT->waiting += size;
1131
1132        /*
1133         * If our output buffer has more than 16 Mebibytes in it, commit those
1134         * bytes and reset the waiting count to 0.
1135         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1136         * case there is still data in the buffer at program exit.
1137         */
1138        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1139                FORMAT_DATA_OUT->txbuffer =
1140                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1141                                                   FORMAT_DATA_OUT->dagstream,
1142                                                   FORMAT_DATA_OUT->waiting);
1143                FORMAT_DATA_OUT->waiting = 0;
1144        }
1145
1146        return size + pad + dag_record_size;
1147}
1148
1149/* Attempts to determine a suitable ERF type for a given packet. Returns true
1150 * if one is found, false otherwise */
1151static bool find_compatible_linktype(libtrace_out_t *libtrace,
1152                                     libtrace_packet_t *packet, char *type)
1153{
1154        /* Keep trying to simplify the packet until we can find
1155         * something we can do with it */
1156
1157        do {
1158                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1159
1160                /* Success */
1161                if (*type != (char)-1)
1162                        return true;
1163
1164                if (!demote_packet(packet)) {
1165                        trace_set_err_out(libtrace,
1166                                          TRACE_ERR_NO_CONVERSION,
1167                                          "No erf type for packet (%i)",
1168                                          trace_get_link_type(packet));
1169                        return false;
1170                }
1171
1172        } while(1);
1173
1174        return true;
1175}
1176
1177/* Writes a packet to the provided DAG output trace */
1178static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1179{
1180        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1181         * coding sucks, sorry about that.
1182         */
1183        unsigned int pad = 0;
1184        int numbytes;
1185        void *payload = packet->payload;
1186        dag_record_t *header = (dag_record_t *)packet->header;
1187        char erf_type = 0;
1188
1189        if(!packet->header) {
1190                /* No header, probably an RT packet. Lifted from
1191                 * erf_write_packet(). */
1192                return -1;
1193        }
1194
1195        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1196                return 0;
1197
1198        pad = dag_get_padding(packet);
1199
1200        /*
1201         * If the payload is null, adjust the rlen. Discussion of this is
1202         * attached to erf_write_packet()
1203         */
1204        if (payload == NULL) {
1205                header->rlen = htons(dag_record_size + pad);
1206        }
1207
1208        if (packet->type == TRACE_RT_DATA_ERF) {
1209                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1210        } else {
1211                /* Build up a new packet header from the existing header */
1212
1213                /* Simplify the packet first - if we can't do this, break
1214                 * early */
1215                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1216                        return -1;
1217
1218                dag_record_t erfhdr;
1219
1220                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1221                payload=packet->payload;
1222                pad = dag_get_padding(packet);
1223
1224                /* Flags. Can't do this */
1225                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1226                if (trace_get_direction(packet)!=(int)~0U)
1227                        erfhdr.flags.iface = trace_get_direction(packet);
1228
1229                erfhdr.type = erf_type;
1230
1231                /* Packet length (rlen includes format overhead) */
1232                assert(trace_get_capture_length(packet) > 0
1233                       && trace_get_capture_length(packet) <= 65536);
1234                assert(erf_get_framing_length(packet) > 0
1235                       && trace_get_framing_length(packet) <= 65536);
1236                assert(trace_get_capture_length(packet) +
1237                       erf_get_framing_length(packet) > 0
1238                       && trace_get_capture_length(packet) +
1239                       erf_get_framing_length(packet) <= 65536);
1240
1241                erfhdr.rlen = htons(trace_get_capture_length(packet)
1242                                    + erf_get_framing_length(packet));
1243
1244
1245                /* Loss counter. Can't do this */
1246                erfhdr.lctr = 0;
1247                /* Wire length, does not include padding! */
1248                erfhdr.wlen = htons(trace_get_wire_length(packet));
1249
1250                /* Write it out */
1251                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1252        }
1253
1254        return numbytes;
1255}
1256
1257/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1258 *
1259 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1260 */
1261static int dag_read_packet_stream(libtrace_t *libtrace,
1262                                struct dag_per_stream_t *stream_data,
1263                                libtrace_thread_t *t, /* Optional */
1264                                libtrace_packet_t *packet)
1265{
1266        int size = 0;
1267        dag_record_t *erfptr = NULL;
1268        struct timeval tv;
1269        int numbytes = 0;
1270        uint32_t flags = 0;
1271        struct timeval maxwait, pollwait;
1272
1273        pollwait.tv_sec = 0;
1274        pollwait.tv_usec = 10000;
1275        maxwait.tv_sec = 0;
1276        maxwait.tv_usec = 250000;
1277
1278        /* Check if we're due for a DUCK report - only report on the first thread */
1279        if (stream_data == FORMAT_DATA_FIRST) {
1280                size = dag_get_duckinfo(libtrace, packet);
1281                if (size != 0)
1282                        return size;
1283        }
1284
1285
1286        /* Don't let anyone try to free our DAG memory hole! */
1287        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1288
1289        /* If the packet buffer is currently owned by libtrace, free it so
1290         * that we can set the packet to point into the DAG memory hole */
1291        if (packet->buf_control == TRACE_CTRL_PACKET) {
1292                free(packet->buffer);
1293                packet->buffer = 0;
1294        }
1295
1296        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
1297                                sizeof(dag_record_t), &maxwait,
1298                                &pollwait) == -1) {
1299                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1300                return -1;
1301        }
1302
1303        /* Grab a full ERF record */
1304        do {
1305                numbytes = dag_available(libtrace, stream_data);
1306                if (numbytes < 0)
1307                        return numbytes;
1308                if (numbytes < dag_record_size) {
1309                        /* Check the message queue if we have one to check */
1310                        if (t != NULL &&
1311                            libtrace_message_queue_count(&t->messages) > 0)
1312                                return -2;
1313
1314                        if (libtrace_halt)
1315                                return 0;
1316                        /* Block until we see a packet */
1317                        continue;
1318                }
1319                erfptr = dag_get_record(stream_data);
1320        } while (erfptr == NULL);
1321
1322        packet->trace = libtrace;
1323        /* Prepare the libtrace packet */
1324        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1325                                    TRACE_RT_DATA_ERF, flags))
1326                return -1;
1327
1328        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1329        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1330                tv = trace_get_timeval(packet);
1331                DUCK.last_pkt = tv.tv_sec;
1332        }
1333
1334        packet->error = packet->payload ? htons(erfptr->rlen) :
1335                                          erf_get_framing_length(packet);
1336
1337        return packet->error;
1338}
1339
1340static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1341{
1342        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1343}
1344
1345static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1346                             libtrace_packet_t **packets, size_t nb_packets)
1347{
1348        int ret;
1349        size_t read_packets = 0;
1350        int numbytes = 0;
1351
1352        struct dag_per_stream_t *stream_data =
1353                (struct dag_per_stream_t *)t->format_data;
1354
1355        /* Read as many packets as we can, but read atleast one packet */
1356        do {
1357                ret = dag_read_packet_stream(libtrace, stream_data, t,
1358                                           packets[read_packets]);
1359                if (ret < 0)
1360                        return ret;
1361
1362                read_packets++;
1363
1364                /* Make sure we don't read too many packets..! */
1365                if (read_packets >= nb_packets)
1366                        break;
1367
1368                numbytes = dag_available(libtrace, stream_data);
1369        } while (numbytes >= dag_record_size);
1370
1371        return read_packets;
1372}
1373
1374/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1375 * packet is available, we will return a packet event. Otherwise we will
1376 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1377 */
1378static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1379                                           libtrace_packet_t *packet)
1380{
1381        libtrace_eventobj_t event = {0,0,0.0,0};
1382        dag_record_t *erfptr = NULL;
1383        int numbytes;
1384        uint32_t flags = 0;
1385        struct timeval minwait, tv;
1386       
1387        minwait.tv_sec = 0;
1388        minwait.tv_usec = 10000;
1389
1390        /* Check if we're meant to provide a DUCK update */
1391        numbytes = dag_get_duckinfo(libtrace, packet);
1392        if (numbytes < 0) {
1393                event.type = TRACE_EVENT_TERMINATE;
1394                return event;
1395        } else if (numbytes > 0) {
1396                event.type = TRACE_EVENT_PACKET;
1397                return event;
1398        }
1399       
1400        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
1401                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1402                                &minwait) == -1) {
1403                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1404                event.type = TRACE_EVENT_TERMINATE;
1405                return event;
1406        }
1407
1408        do {
1409                erfptr = NULL;
1410                numbytes = 0;
1411
1412                /* Need to call dag_available so that the top pointer will get
1413                 * updated, otherwise we'll never see any data! */
1414                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1415
1416                /* May as well not bother calling dag_get_record if
1417                 * dag_available suggests that there's no data */
1418                if (numbytes != 0)
1419                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1420                if (erfptr == NULL) {
1421                        /* No packet available - sleep for a very short time */
1422                        if (libtrace_halt) {
1423                                event.type = TRACE_EVENT_TERMINATE;
1424                        } else {
1425                                event.type = TRACE_EVENT_SLEEP;
1426                                event.seconds = 0.0001;
1427                        }
1428                        break;
1429                }
1430                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1431                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1432                        event.type = TRACE_EVENT_TERMINATE;
1433                        break;
1434                }
1435
1436
1437                event.size = trace_get_capture_length(packet) +
1438                        trace_get_framing_length(packet);
1439
1440                /* XXX trace_read_packet() normally applies the following
1441                 * config options for us, but this function is called via
1442                 * trace_event() so we have to do it ourselves */
1443
1444                if (libtrace->filter) {
1445                        int filtret = trace_apply_filter(libtrace->filter,
1446                                                         packet);
1447                        if (filtret == -1) {
1448                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1449                                              "Bad BPF Filter");
1450                                event.type = TRACE_EVENT_TERMINATE;
1451                                break;
1452                        }
1453
1454                        if (filtret == 0) {
1455                                /* This packet isn't useful so we want to
1456                                 * immediately see if there is another suitable
1457                                 * one - we definitely DO NOT want to return
1458                                 * a sleep event in this case, like we used to
1459                                 * do! */
1460                                libtrace->filtered_packets ++;
1461                                trace_clear_cache(packet);
1462                                continue;
1463                        }
1464
1465                        event.type = TRACE_EVENT_PACKET;
1466                } else {
1467                        event.type = TRACE_EVENT_PACKET;
1468                }
1469
1470                /* Update the DUCK timer */
1471                tv = trace_get_timeval(packet);
1472                DUCK.last_pkt = tv.tv_sec;
1473               
1474                if (libtrace->snaplen > 0) {
1475                        trace_set_capture_length(packet, libtrace->snaplen);
1476                }
1477                libtrace->accepted_packets ++;
1478                break;
1479        } while(1);
1480
1481        return event;
1482}
1483
1484static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1485{
1486        libtrace_list_node_t *tmp;
1487        assert(stat && libtrace);
1488        tmp = FORMAT_DATA_HEAD;
1489
1490        /* Dropped packets */
1491        stat->dropped_valid = 1;
1492        stat->dropped = 0;
1493        while (tmp != NULL) {
1494                stat->dropped += STREAM_DATA(tmp)->drops;
1495                tmp = tmp->next;
1496        }
1497
1498}
1499
1500static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1501                                       libtrace_stat_t *stat) {
1502        struct dag_per_stream_t *stream_data = t->format_data;
1503        assert(stat && libtrace);
1504
1505        stat->dropped_valid = 1;
1506        stat->dropped = stream_data->drops;
1507
1508        stat->filtered_valid = 1;
1509        stat->filtered = 0;
1510}
1511
1512/* Prints some semi-useful help text about the DAG format module */
1513static void dag_help(void) {
1514        printf("dag format module: $Revision: 1755 $\n");
1515        printf("Supported input URIs:\n");
1516        printf("\tdag:/dev/dagn\n");
1517        printf("\n");
1518        printf("\te.g.: dag:/dev/dag0\n");
1519        printf("\n");
1520        printf("Supported output URIs:\n");
1521        printf("\tnone\n");
1522        printf("\n");
1523}
1524
1525static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1526                                bool reader)
1527{
1528        struct dag_per_stream_t *stream_data;
1529        libtrace_list_node_t *node;
1530
1531        if (reader && t->type == THREAD_PERPKT) {
1532                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1533                                                t->perpkt_num);
1534                if (node == NULL) {
1535                        return -1;
1536                }
1537                stream_data = node->data;
1538
1539                /* Pass the per thread data to the thread */
1540                t->format_data = stream_data;
1541
1542                /* Attach and start the DAG stream */
1543                if (dag_start_input_stream(libtrace, stream_data) < 0)
1544                        return -1;
1545        }
1546
1547        return 0;
1548}
1549
1550static struct libtrace_format_t dag = {
1551        "dag",
1552        "$Id$",
1553        TRACE_FORMAT_ERF,
1554        dag_probe_filename,             /* probe filename */
1555        NULL,                           /* probe magic */
1556        dag_init_input,                 /* init_input */
1557        dag_config_input,               /* config_input */
1558        dag_start_input,                /* start_input */
1559        dag_pause_input,                /* pause_input */
1560        dag_init_output,                /* init_output */
1561        NULL,                           /* config_output */
1562        dag_start_output,               /* start_output */
1563        dag_fin_input,                  /* fin_input */
1564        dag_fin_output,                 /* fin_output */
1565        dag_read_packet,                /* read_packet */
1566        dag_prepare_packet,             /* prepare_packet */
1567        NULL,                           /* fin_packet */
1568        dag_write_packet,               /* write_packet */
1569        erf_get_link_type,              /* get_link_type */
1570        erf_get_direction,              /* get_direction */
1571        erf_set_direction,              /* set_direction */
1572        erf_get_erf_timestamp,          /* get_erf_timestamp */
1573        NULL,                           /* get_timeval */
1574        NULL,                           /* get_seconds */
1575        NULL,                           /* get_timespec */
1576        NULL,                           /* seek_erf */
1577        NULL,                           /* seek_timeval */
1578        NULL,                           /* seek_seconds */
1579        erf_get_capture_length,         /* get_capture_length */
1580        erf_get_wire_length,            /* get_wire_length */
1581        erf_get_framing_length,         /* get_framing_length */
1582        erf_set_capture_length,         /* set_capture_length */
1583        NULL,                           /* get_received_packets */
1584        NULL,                           /* get_filtered_packets */
1585        NULL,                           /* get_dropped_packets */
1586        dag_get_statistics,             /* get_statistics */
1587        NULL,                           /* get_fd */
1588        trace_event_dag,                /* trace_event */
1589        dag_help,                       /* help */
1590        NULL,                            /* next pointer */
1591        {true, 0}, /* live packet capture, thread limit TBD */
1592        dag_pstart_input,
1593        dag_pread_packets,
1594        dag_pause_input,
1595        NULL,
1596        dag_pregister_thread,
1597        NULL,
1598        dag_get_thread_statistics       /* get thread stats */
1599};
1600
1601void dag_constructor(void)
1602{
1603        register_format(&dag);
1604}
Note: See TracBrowser for help on using the repository browser.