source: lib/format_dag25.c @ 10c47a0

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 10c47a0 was 10c47a0, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Fixes DAG DUCK reporting for parallel libtrace.
In parallel libtrace DUCK is only ever sent to the first thread.

It is now up each formats pread_packet to tag the trace along with
the error (AKA bytes read) to each packet.

Change logic in parallel libtrace to alwaus prefer pread over read if
it exists.

Fix some unresolved conflict in DPDK that I missed, that was ifdef'd out.

  • Property mode set to 100644
File size: 45.2 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81#define DATA(x) ((struct dag_format_data_t *)x->format_data)
82#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89
90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
92
93static struct libtrace_format_t dag;
94
95/* A DAG device - a DAG device can support multiple streams (and therefore
96 * multiple input traces) so each trace needs to refer to a device */
97struct dag_dev_t {
98        char * dev_name;                /* Device name */
99        int fd;                         /* File descriptor */
100        uint16_t ref_count;             /* Number of input / output traces
101                                           that are using this device */
102        struct dag_dev_t *prev;         /* Pointer to the previous device in
103                                           the device list */
104        struct dag_dev_t *next;         /* Pointer to the next device in the
105                                           device list */
106};
107
108/* "Global" data that is stored for each DAG output trace */
109struct dag_format_data_out_t {
110        /* The DAG device being used for writing */
111        struct dag_dev_t *device;
112        /* The DAG stream that is being written on */
113        unsigned int dagstream;
114        /* Boolean flag indicating whether the stream is currently attached */
115        int stream_attached;
116        /* The amount of data waiting to be transmitted, in bytes */
117        uint64_t waiting;
118        /* A buffer to hold the data to be transmittted */
119        uint8_t *txbuffer;
120};
121
122/* Data that is stored against each input stream */
123struct dag_per_stream_t {
124        /* DAG stream number */
125        uint16_t dagstream;
126        /* Pointer to the last unread byte in the DAG memory */
127        uint8_t *top;
128        /* Pointer to the first unread byte in the DAG memory */
129        uint8_t *bottom;
130        /* Amount of data processed from the bottom pointer */
131        uint32_t processed;
132        /* Number of packets seen by the stream */
133        uint64_t pkt_count;
134        /* Drop count for this particular stream */
135        uint64_t drops;
136        /* Boolean values to indicate if a particular interface has been seen
137         * or not. This is limited to four interfaces, which is enough to
138         * support all current DAG cards */
139        uint8_t seeninterface[4];
140};
141
142/* "Global" data that is stored for each DAG input trace */
143struct dag_format_data_t {
144        /* DAG device */
145        struct dag_dev_t *device;
146        /* Boolean flag indicating whether the trace is currently attached */
147        int stream_attached;
148        /* Data stored against each DAG input stream */
149        libtrace_list_t *per_stream;
150
151        /* Data required for regular DUCK reporting.
152         * We put this on a new cache line otherwise we have a lot of false
153         * sharing caused by updating the last_pkt.
154         * This should only ever be accessed by the first thread stream,
155         * that includes both read and write operations.
156         */
157        struct {
158                /* Timestamp of the last DUCK report */
159                uint32_t last_duck;
160                /* The number of seconds between each DUCK report */
161                uint32_t duck_freq;
162                /* Timestamp of the last packet read from the DAG card */
163                uint32_t last_pkt;
164                /* Dummy trace to ensure DUCK packets are dealt with using the
165                 * DUCK format functions */
166                libtrace_t *dummy_duck;
167        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
168};
169
170/* To be thread-safe, we're going to need a mutex for operating on the list
171 * of DAG devices */
172pthread_mutex_t open_dag_mutex;
173
174/* The list of DAG devices that have been opened by libtrace.
175 *
176 * We can only open each DAG device once, but we might want to read from
177 * multiple streams. Therefore, we need to maintain a list of devices that we
178 * have opened (with ref counts!) so that we don't try to open a device too
179 * many times or close a device that we're still using */
180struct dag_dev_t *open_dags = NULL;
181
182/* Returns the amount of padding between the ERF header and the start of the
183 * captured packet data */
184static int dag_get_padding(const libtrace_packet_t *packet)
185{
186        /* ERF Ethernet records have a 2 byte padding before the packet itself
187         * so that the IP header is aligned on a 32 bit boundary.
188         */
189        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
190                dag_record_t *erfptr = (dag_record_t *)packet->header;
191                switch(erfptr->type) {
192                        case TYPE_ETH:
193                        case TYPE_DSM_COLOR_ETH:
194                                return 2;
195                        default:                return 0;
196                }
197        }
198        else {
199                switch(trace_get_link_type(packet)) {
200                        case TRACE_TYPE_ETH:    return 2;
201                        default:                return 0;
202                }
203        }
204}
205
206/* Attempts to determine if the given filename refers to a DAG device */
207static int dag_probe_filename(const char *filename)
208{
209        struct stat statbuf;
210        /* Can we stat the file? */
211        if (stat(filename, &statbuf) != 0) {
212                return 0;
213        }
214        /* Is it a character device? */
215        if (!S_ISCHR(statbuf.st_mode)) {
216                return 0;
217        }
218        /* Yeah, it's probably us. */
219        return 1;
220}
221
222/* Initialises the DAG output data structure */
223static void dag_init_format_out_data(libtrace_out_t *libtrace)
224{
225        libtrace->format_data = (struct dag_format_data_out_t *)
226                malloc(sizeof(struct dag_format_data_out_t));
227        // no DUCK on output
228        FORMAT_DATA_OUT->stream_attached = 0;
229        FORMAT_DATA_OUT->device = NULL;
230        FORMAT_DATA_OUT->dagstream = 0;
231        FORMAT_DATA_OUT->waiting = 0;
232
233}
234
235/* Initialises the DAG input data structure */
236static void dag_init_format_data(libtrace_t *libtrace)
237{
238        struct dag_per_stream_t stream_data;
239
240        libtrace->format_data = (struct dag_format_data_t *)
241                malloc(sizeof(struct dag_format_data_t));
242        DUCK.last_duck = 0;
243        DUCK.duck_freq = 0;
244        DUCK.last_pkt = 0;
245        DUCK.dummy_duck = NULL;
246
247        FORMAT_DATA->per_stream =
248                libtrace_list_init(sizeof(stream_data));
249        assert(FORMAT_DATA->per_stream != NULL);
250
251        /* We'll start with just one instance of stream_data, and we'll
252         * add more later if we need them */
253        memset(&stream_data, 0, sizeof(stream_data));
254        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
255}
256
257/* Determines if there is already an entry for the given DAG device in the
258 * device list and increments the reference count for that device, if found.
259 *
260 * NOTE: This function assumes the open_dag_mutex is held by the caller */
261static struct dag_dev_t *dag_find_open_device(char *dev_name)
262{
263        struct dag_dev_t *dag_dev;
264
265        dag_dev = open_dags;
266
267        /* XXX: Not exactly zippy, but how often are we going to be dealing
268         * with multiple dag cards? */
269        while (dag_dev != NULL) {
270                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
271                        dag_dev->ref_count ++;
272                        return dag_dev;
273                }
274                dag_dev = dag_dev->next;
275        }
276        return NULL;
277}
278
279/* Closes a DAG device and removes it from the device list.
280 *
281 * Attempting to close a DAG device that has a non-zero reference count will
282 * cause an assertion failure!
283 *
284 * NOTE: This function assumes the open_dag_mutex is held by the caller */
285static void dag_close_device(struct dag_dev_t *dev)
286{
287        /* Need to remove from the device list */
288        assert(dev->ref_count == 0);
289
290        if (dev->prev == NULL) {
291                open_dags = dev->next;
292                if (dev->next)
293                        dev->next->prev = NULL;
294        } else {
295                dev->prev->next = dev->next;
296                if (dev->next)
297                        dev->next->prev = dev->prev;
298        }
299
300        dag_close(dev->fd);
301        if (dev->dev_name)
302                free(dev->dev_name);
303        free(dev);
304}
305
306
307/* Opens a new DAG device for writing and adds it to the DAG device list
308 *
309 * NOTE: this function should only be called when opening a DAG device for
310 * writing - there is little practical difference between this and the
311 * function below that covers the reading case, but we need the output trace
312 * object to report errors properly so the two functions take slightly
313 * different arguments. This is really lame and there should be a much better
314 * way of doing this.
315 *
316 * NOTE: This function assumes the open_dag_mutex is held by the caller
317 */
318static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
319                                                char *dev_name)
320{
321        struct stat buf;
322        int fd;
323        struct dag_dev_t *new_dev;
324
325        /* Make sure the device exists */
326        if (stat(dev_name, &buf) == -1) {
327                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
328                return NULL;
329        }
330
331        /* Make sure it is the appropriate type of device */
332        if (S_ISCHR(buf.st_mode)) {
333                /* Try opening the DAG device */
334                if((fd = dag_open(dev_name)) < 0) {
335                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
336                                        dev_name);
337                        return NULL;
338                }
339        } else {
340                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
341                                dev_name);
342                return NULL;
343        }
344
345        /* Add the device to our device list - it is just a doubly linked
346         * list with no inherent ordering; just tack the new one on the front
347         */
348        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
349        new_dev->fd = fd;
350        new_dev->dev_name = dev_name;
351        new_dev->ref_count = 1;
352
353        new_dev->prev = NULL;
354        new_dev->next = open_dags;
355        if (open_dags)
356                open_dags->prev = new_dev;
357
358        open_dags = new_dev;
359
360        return new_dev;
361}
362
363/* Opens a new DAG device for reading and adds it to the DAG device list
364 *
365 * NOTE: this function should only be called when opening a DAG device for
366 * reading - there is little practical difference between this and the
367 * function above that covers the writing case, but we need the input trace
368 * object to report errors properly so the two functions take slightly
369 * different arguments. This is really lame and there should be a much better
370 * way of doing this.
371 *
372 * NOTE: This function assumes the open_dag_mutex is held by the caller */
373static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
374        struct stat buf;
375        int fd;
376        struct dag_dev_t *new_dev;
377
378        /* Make sure the device exists */
379        if (stat(dev_name, &buf) == -1) {
380                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
381                return NULL;
382        }
383
384        /* Make sure it is the appropriate type of device */
385        if (S_ISCHR(buf.st_mode)) {
386                /* Try opening the DAG device */
387                if((fd = dag_open(dev_name)) < 0) {
388                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
389                                      dev_name);
390                        return NULL;
391                }
392        } else {
393                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
394                              dev_name);
395                return NULL;
396        }
397
398        /* Add the device to our device list - it is just a doubly linked
399         * list with no inherent ordering; just tack the new one on the front
400         */
401        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
402        new_dev->fd = fd;
403        new_dev->dev_name = dev_name;
404        new_dev->ref_count = 1;
405
406        new_dev->prev = NULL;
407        new_dev->next = open_dags;
408        if (open_dags)
409                open_dags->prev = new_dev;
410
411        open_dags = new_dev;
412
413        return new_dev;
414}
415
416/* Creates and initialises a DAG output trace */
417static int dag_init_output(libtrace_out_t *libtrace)
418{
419        /* Upon successful creation, the device name is stored against the
420         * device and free when it is free()d */
421        char *dag_dev_name = NULL;
422        char *scan = NULL;
423        struct dag_dev_t *dag_device = NULL;
424        int stream = 1;
425
426        /* XXX I don't know if this is important or not, but this function
427         * isn't present in all of the driver releases that this code is
428         * supposed to support! */
429        /*
430        unsigned long wake_time;
431        dagutil_sleep_get_wake_time(&wake_time,0);
432        */
433
434        dag_init_format_out_data(libtrace);
435        /* Grab the mutex while we're likely to be messing with the device
436         * list */
437        pthread_mutex_lock(&open_dag_mutex);
438
439        /* Specific streams are signified using a comma in the libtrace URI,
440         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
441         *
442         * If no stream is specified, we will write using stream 1 */
443        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
444                dag_dev_name = strdup(libtrace->uridata);
445        } else {
446                dag_dev_name = (char *)strndup(libtrace->uridata,
447                                (size_t)(scan - libtrace->uridata));
448                stream = atoi(++scan);
449        }
450        FORMAT_DATA_OUT->dagstream = stream;
451
452        /* See if our DAG device is already open */
453        dag_device = dag_find_open_device(dag_dev_name);
454
455        if (dag_device == NULL) {
456                /* Device not yet opened - open it ourselves */
457                dag_device = dag_open_output_device(libtrace, dag_dev_name);
458        } else {
459                /* Otherwise, just use the existing one */
460                free(dag_dev_name);
461                dag_dev_name = NULL;
462        }
463
464        /* Make sure we have successfully opened a DAG device */
465        if (dag_device == NULL) {
466                if (dag_dev_name) {
467                        free(dag_dev_name);
468                }
469                pthread_mutex_unlock(&open_dag_mutex);
470                return -1;
471        }
472
473        FORMAT_DATA_OUT->device = dag_device;
474        pthread_mutex_unlock(&open_dag_mutex);
475        return 0;
476}
477
478/* Creates and initialises a DAG input trace */
479static int dag_init_input(libtrace_t *libtrace) {
480        /* Upon successful creation, the device name is stored against the
481         * device and free when it is free()d */
482        char *dag_dev_name = NULL;
483        char *scan = NULL;
484        int stream = 0;
485        struct dag_dev_t *dag_device = NULL;
486
487        dag_init_format_data(libtrace);
488        /* Grab the mutex while we're likely to be messing with the device
489         * list */
490        pthread_mutex_lock(&open_dag_mutex);
491
492
493        /* DAG cards support multiple streams. In a single threaded capture,
494         * these are specified using a comma in the libtrace URI,
495         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
496         *
497         * If no stream is specified, we will read from stream 0 with
498         * one thread
499         */
500        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
501                dag_dev_name = strdup(libtrace->uridata);
502        } else {
503                dag_dev_name = (char *)strndup(libtrace->uridata,
504                                (size_t)(scan - libtrace->uridata));
505                stream = atoi(++scan);
506        }
507
508        FORMAT_DATA_FIRST->dagstream = stream;
509
510        /* See if our DAG device is already open */
511        dag_device = dag_find_open_device(dag_dev_name);
512
513        if (dag_device == NULL) {
514                /* Device not yet opened - open it ourselves */
515                dag_device = dag_open_device(libtrace, dag_dev_name);
516        } else {
517                /* Otherwise, just use the existing one */
518                free(dag_dev_name);
519                dag_dev_name = NULL;
520        }
521
522        /* Make sure we have successfully opened a DAG device */
523        if (dag_device == NULL) {
524                if (dag_dev_name)
525                        free(dag_dev_name);
526                dag_dev_name = NULL;
527                pthread_mutex_unlock(&open_dag_mutex);
528                return -1;
529        }
530
531        FORMAT_DATA->device = dag_device;
532
533        /* See Config_Status_API_Programming_Guide.pdf from the Endace
534           Dag Documentation */
535        /* Check kBooleanAttributeActive is true -- no point capturing
536         * on an interface that's disabled
537         *
538         * The symptom of the port being disabled is that libtrace
539         * will appear to hang. */
540        /* Check kBooleanAttributeFault is false */
541        /* Check kBooleanAttributeLocalFault is false */
542        /* Check kBooleanAttributeLock is true ? */
543        /* Check kBooleanAttributePeerLink ? */
544
545        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
546           on libtrace promisc attribute?*/
547        /* Set kUint32AttributeSnapLength to the snaplength */
548
549        pthread_mutex_unlock(&open_dag_mutex);
550        return 0;
551}
552
553/* Configures a DAG input trace */
554static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
555                            void *data)
556{
557        char conf_str[4096];
558        switch(option) {
559        case TRACE_OPTION_META_FREQ:
560                /* This option is used to specify the frequency of DUCK
561                 * updates */
562                DUCK.duck_freq = *(int *)data;
563                return 0;
564        case TRACE_OPTION_SNAPLEN:
565                /* Tell the card our new snap length */
566                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
567                if (dag_configure(FORMAT_DATA->device->fd,
568                                  conf_str) != 0) {
569                        trace_set_err(libtrace, errno, "Failed to configure "
570                                      "snaplen on DAG card: %s",
571                                      libtrace->uridata);
572                        return -1;
573                }
574                return 0;
575        case TRACE_OPTION_PROMISC:
576                /* DAG already operates in a promisc fashion */
577                return -1;
578        case TRACE_OPTION_FILTER:
579                /* We don't yet support pushing filters into DAG
580                 * cards */
581                return -1;
582        case TRACE_OPTION_EVENT_REALTIME:
583                /* Live capture is always going to be realtime */
584                return -1;
585        }
586        return -1;
587}
588
589/* Starts a DAG output trace */
590static int dag_start_output(libtrace_out_t *libtrace)
591{
592        struct timeval zero, nopoll;
593
594        zero.tv_sec = 0;
595        zero.tv_usec = 0;
596        nopoll = zero;
597
598        /* Attach and start the DAG stream */
599        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
600                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
601                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
602                return -1;
603        }
604
605        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
606                        FORMAT_DATA_OUT->dagstream) < 0) {
607                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
608                return -1;
609        }
610        FORMAT_DATA_OUT->stream_attached = 1;
611
612        /* We don't want the dag card to do any sleeping */
613        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
614                        FORMAT_DATA_OUT->dagstream, 0, &zero,
615                        &nopoll);
616
617        return 0;
618}
619
620static int dag_start_input_stream(libtrace_t *libtrace,
621                                  struct dag_per_stream_t * stream) {
622        struct timeval zero, nopoll;
623        uint8_t *top, *bottom, *starttop;
624        top = bottom = NULL;
625
626        zero.tv_sec = 0;
627        zero.tv_usec = 10000;
628        nopoll = zero;
629
630        /* Attach and start the DAG stream */
631        if (dag_attach_stream(FORMAT_DATA->device->fd,
632                              stream->dagstream, 0, 0) < 0) {
633                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
634                              stream->dagstream);
635                return -1;
636        }
637
638        if (dag_start_stream(FORMAT_DATA->device->fd,
639                             stream->dagstream) < 0) {
640                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
641                              stream->dagstream);
642                return -1;
643        }
644        FORMAT_DATA->stream_attached = 1;
645
646        /* We don't want the dag card to do any sleeping */
647        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
648                            stream->dagstream, 0, &zero,
649                            &nopoll) < 0) {
650                trace_set_err(libtrace, errno,
651                              "dag_set_stream_poll failed!");
652                return -1;
653        }
654
655        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
656                                      stream->dagstream,
657                                      &bottom);
658
659        /* Should probably flush the memory hole now */
660        top = starttop;
661        while (starttop - bottom > 0) {
662                bottom += (starttop - bottom);
663                top = dag_advance_stream(FORMAT_DATA->device->fd,
664                                         stream->dagstream,
665                                         &bottom);
666        }
667        stream->top = top;
668        stream->bottom = bottom;
669        stream->processed = 0;
670        stream->drops = 0;
671
672        return 0;
673
674}
675
676/* Starts a DAG input trace */
677static int dag_start_input(libtrace_t *libtrace)
678{
679        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
680}
681
682static int dag_pstart_input(libtrace_t *libtrace)
683{
684        char *scan, *tok;
685        uint16_t stream_count = 0, max_streams;
686        int iserror = 0;
687        struct dag_per_stream_t stream_data;
688
689        /* Check we aren't trying to create more threads than the DAG card can
690         * handle */
691        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
692        if (libtrace->perpkt_thread_count > max_streams) {
693                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
694                              "trying to create too many threads (max is %u)",
695                              max_streams);
696                iserror = 1;
697                goto cleanup;
698        }
699
700        /* Get the stream names from the uri */
701        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
702                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
703                              "format uri doesn't specify the DAG streams");
704                iserror = 1;
705                goto cleanup;
706        }
707
708        scan++;
709
710        tok = strtok(scan, ",");
711        while (tok != NULL) {
712                /* Ensure we haven't specified too many streams */
713                if (stream_count >= libtrace->perpkt_thread_count) {
714                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
715                                      "format uri specifies too many streams. "
716                                      "Max is %u", max_streams);
717                        iserror = 1;
718                        goto cleanup;
719                }
720
721                /* Save the stream details */
722                if (stream_count == 0) {
723                        /* Special case where we update the existing stream
724                         * data structure */
725                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
726                } else {
727                        memset(&stream_data, 0, sizeof(stream_data));
728                        stream_data.dagstream = (uint16_t)atoi(tok);
729                        libtrace_list_push_back(FORMAT_DATA->per_stream,
730                                                &stream_data);
731                }
732
733                stream_count++;
734                tok = strtok(NULL, ",");
735        }
736
737        FORMAT_DATA->stream_attached = 1;
738
739 cleanup:
740        if (iserror) {
741                return -1;
742        } else {
743                return 0;
744        }
745}
746
747/* Pauses a DAG output trace */
748static int dag_pause_output(libtrace_out_t *libtrace)
749{
750        /* Stop and detach the stream */
751        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
752                            FORMAT_DATA_OUT->dagstream) < 0) {
753                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
754                return -1;
755        }
756        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
757                              FORMAT_DATA_OUT->dagstream) < 0) {
758                trace_set_err_out(libtrace, errno,
759                                  "Could not detach DAG stream");
760                return -1;
761        }
762        FORMAT_DATA_OUT->stream_attached = 0;
763        return 0;
764}
765
766/* Pauses a DAG input trace */
767static int dag_pause_input(libtrace_t *libtrace)
768{
769        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
770
771        /* Stop and detach each stream */
772        while (tmp != NULL) {
773                if (dag_stop_stream(FORMAT_DATA->device->fd,
774                                    STREAM_DATA(tmp)->dagstream) < 0) {
775                        trace_set_err(libtrace, errno,
776                                      "Could not stop DAG stream");
777                        printf("Count not stop DAG stream\n");
778                        return -1;
779                }
780                if (dag_detach_stream(FORMAT_DATA->device->fd,
781                                      STREAM_DATA(tmp)->dagstream) < 0) {
782                        trace_set_err(libtrace, errno,
783                                      "Could not detach DAG stream");
784                        printf("Count not detach DAG stream\n");
785                        return -1;
786                }
787
788                tmp = tmp->next;
789        }
790
791        FORMAT_DATA->stream_attached = 0;
792        return 0;
793}
794
795
796
797/* Closes a DAG input trace */
798static int dag_fin_input(libtrace_t *libtrace)
799{
800        /* Need the lock, since we're going to be handling the device list */
801        pthread_mutex_lock(&open_dag_mutex);
802
803        /* Detach the stream if we are not paused */
804        if (FORMAT_DATA->stream_attached)
805                dag_pause_input(libtrace);
806        FORMAT_DATA->device->ref_count--;
807
808        /* Close the DAG device if there are no more references to it */
809        if (FORMAT_DATA->device->ref_count == 0)
810                dag_close_device(FORMAT_DATA->device);
811
812        if (DUCK.dummy_duck)
813                trace_destroy_dead(DUCK.dummy_duck);
814
815        /* Clear the list */
816        libtrace_list_deinit(FORMAT_DATA->per_stream);
817        free(libtrace->format_data);
818        pthread_mutex_unlock(&open_dag_mutex);
819        return 0; /* success */
820}
821
822/* Closes a DAG output trace */
823static int dag_fin_output(libtrace_out_t *libtrace)
824{
825
826        /* Commit any outstanding traffic in the txbuffer */
827        if (FORMAT_DATA_OUT->waiting) {
828                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
829                                           FORMAT_DATA_OUT->dagstream,
830                                           FORMAT_DATA_OUT->waiting );
831        }
832
833        /* Wait until the buffer is nearly clear before exiting the program,
834         * as we will lose packets otherwise */
835        dag_tx_get_stream_space
836                (FORMAT_DATA_OUT->device->fd,
837                 FORMAT_DATA_OUT->dagstream,
838                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
839                                            FORMAT_DATA_OUT->dagstream) - 8);
840
841        /* Need the lock, since we're going to be handling the device list */
842        pthread_mutex_lock(&open_dag_mutex);
843
844        /* Detach the stream if we are not paused */
845        if (FORMAT_DATA_OUT->stream_attached)
846                dag_pause_output(libtrace);
847        FORMAT_DATA_OUT->device->ref_count --;
848
849        /* Close the DAG device if there are no more references to it */
850        if (FORMAT_DATA_OUT->device->ref_count == 0)
851                dag_close_device(FORMAT_DATA_OUT->device);
852        free(libtrace->format_data);
853        pthread_mutex_unlock(&open_dag_mutex);
854        return 0; /* success */
855}
856
857#ifdef DAGIOC_CARD_DUCK
858#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
859#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
860#else
861#ifdef DAGIOCDUCK
862#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
863#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
864#else
865#warning "DAG appears to be missing DUCK support"
866#endif
867#endif
868
869/* Extracts DUCK information from the DAG card and produces a DUCK packet */
870static int dag_get_duckinfo(libtrace_t *libtrace,
871                                libtrace_packet_t *packet) {
872
873        if (DUCK.duck_freq == 0)
874                return 0;
875
876#ifndef LIBTRACE_DUCK_IOCTL
877        trace_set_err(libtrace, errno, 
878                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
879        DUCK.duck_freq = 0;
880        return -1;
881#endif
882
883        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
884                return 0;
885
886        /* Allocate memory for the DUCK data */
887        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
888            !packet->buffer) {
889                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
890                packet->buf_control = TRACE_CTRL_PACKET;
891                if (!packet->buffer) {
892                        trace_set_err(libtrace, errno,
893                                      "Cannot allocate packet buffer");
894                        return -1;
895                }
896        }
897
898        /* DUCK doesn't have a format header */
899        packet->header = 0;
900        packet->payload = packet->buffer;
901
902        /* No need to check if we can get DUCK or not - we're modern
903         * enough so just grab the DUCK info */
904        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
905                   (duckinf_t *)packet->payload) < 0)) {
906                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
907                DUCK.duck_freq = 0;
908                return -1;
909        }
910
911        packet->type = LIBTRACE_DUCK_VERSION;
912
913        /* Set the packet's trace to point at a DUCK trace, so that the
914         * DUCK format functions will be called on the packet rather than the
915         * DAG ones */
916        if (!DUCK.dummy_duck)
917                DUCK.dummy_duck = trace_create_dead("duck:dummy");
918        packet->trace = DUCK.dummy_duck;
919        DUCK.last_duck = DUCK.last_pkt;
920        packet->error = sizeof(duckinf_t);
921        return sizeof(duckinf_t);
922}
923
924/* Determines the amount of data available to read from the DAG card */
925static int dag_available(libtrace_t *libtrace,
926                         struct dag_per_stream_t *stream_data)
927{
928        uint32_t diff = stream_data->top - stream_data->bottom;
929
930        /* If we've processed more than 4MB of data since we last called
931         * dag_advance_stream, then we should call it again to allow the
932         * space occupied by that 4MB to be released */
933        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
934                return diff;
935
936        /* Update the top and bottom pointers */
937        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
938                                              stream_data->dagstream,
939                                              &(stream_data->bottom));
940
941        if (stream_data->top == NULL) {
942                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
943                return -1;
944        }
945        stream_data->processed = 0;
946        diff = stream_data->top - stream_data->bottom;
947        return diff;
948}
949
950/* Returns a pointer to the start of the next complete ERF record */
951static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
952{
953        dag_record_t *erfptr = NULL;
954        uint16_t size;
955
956        erfptr = (dag_record_t *)stream_data->bottom;
957        if (!erfptr)
958                return NULL;
959
960        size = ntohs(erfptr->rlen);
961        assert( size >= dag_record_size );
962
963        /* Make certain we have the full packet available */
964        if (size > (stream_data->top - stream_data->bottom))
965                return NULL;
966
967        stream_data->bottom += size;
968        stream_data->processed += size;
969        return erfptr;
970}
971
972/* Converts a buffer containing a recently read DAG packet record into a
973 * libtrace packet */
974static int dag_prepare_packet_stream(libtrace_t *libtrace,
975                                     struct dag_per_stream_t *stream_data,
976                                     libtrace_packet_t *packet,
977                                     void *buffer, libtrace_rt_types_t rt_type,
978                                     uint32_t flags)
979{
980        dag_record_t *erfptr;
981
982        /* If the packet previously owned a buffer that is not the buffer
983         * that contains the new packet data, we're going to need to free the
984         * old one to avoid memory leaks */
985        if (packet->buffer != buffer &&
986            packet->buf_control == TRACE_CTRL_PACKET) {
987                free(packet->buffer);
988        }
989
990        /* Set the buffer owner appropriately */
991        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
992                packet->buf_control = TRACE_CTRL_PACKET;
993        } else
994                packet->buf_control = TRACE_CTRL_EXTERNAL;
995
996        /* Update the packet pointers and type appropriately */
997        erfptr = (dag_record_t *)buffer;
998        packet->buffer = erfptr;
999        packet->header = erfptr;
1000        packet->type = rt_type;
1001
1002        if (erfptr->flags.rxerror == 1) {
1003                /* rxerror means the payload is corrupt - drop the payload
1004                 * by tweaking rlen */
1005                packet->payload = NULL;
1006                erfptr->rlen = htons(erf_get_framing_length(packet));
1007        } else {
1008                packet->payload = (char*)packet->buffer
1009                        + erf_get_framing_length(packet);
1010        }
1011
1012        if (libtrace->format_data == NULL) {
1013                dag_init_format_data(libtrace);
1014        }
1015
1016        /* Update the dropped packets counter */
1017        /* No loss counter for DSM coloured records - have to use some
1018         * other API */
1019        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
1020                /* TODO */
1021        } else {
1022                /* Use the ERF loss counter */
1023                if (stream_data->seeninterface[erfptr->flags.iface]
1024                    == 0) {
1025                        stream_data->seeninterface[erfptr->flags.iface]
1026                                = 1;
1027                } else {
1028                        stream_data->drops += ntohs(erfptr->lctr);
1029                }
1030        }
1031
1032        return 0;
1033}
1034
1035static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1036                              void *buffer, libtrace_rt_types_t rt_type,
1037                              uint32_t flags)
1038{
1039        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1040                                       buffer, rt_type, flags);
1041}
1042
1043/*
1044 * dag_write_packet() at this stage attempts to improve tx performance
1045 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1046 * has been met. I observed approximately 270% performance increase
1047 * through this relatively naive tweak. No optimisation of buffer sizes
1048 * was attempted.
1049 */
1050
1051/* Pushes an ERF record onto the transmit stream */
1052static int dag_dump_packet(libtrace_out_t *libtrace,
1053                           dag_record_t *erfptr, unsigned int pad,
1054                           void *buffer)
1055{
1056        int size;
1057
1058        /*
1059         * If we've got 0 bytes waiting in the txqueue, assume that we
1060         * haven't requested any space yet, and request some, storing
1061         * the pointer at FORMAT_DATA_OUT->txbuffer.
1062         *
1063         * The amount to request is slightly magical at the moment - it's
1064         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1065         * the buffer and handle overruns.
1066         */
1067        if (FORMAT_DATA_OUT->waiting == 0) {
1068                FORMAT_DATA_OUT->txbuffer =
1069                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1070                                                FORMAT_DATA_OUT->dagstream,
1071                                                16908288);
1072        }
1073
1074        /*
1075         * Copy the header separately to the body, as we can't guarantee they
1076         * are in contiguous memory
1077         */
1078        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1079               (dag_record_size + pad));
1080        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1081
1082        /*
1083         * Copy our incoming packet into the outgoing buffer, and increment
1084         * our waiting count
1085         */
1086        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1087        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1088               size);
1089        FORMAT_DATA_OUT->waiting += size;
1090
1091        /*
1092         * If our output buffer has more than 16 Mebibytes in it, commit those
1093         * bytes and reset the waiting count to 0.
1094         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1095         * case there is still data in the buffer at program exit.
1096         */
1097        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1098                FORMAT_DATA_OUT->txbuffer =
1099                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1100                                                   FORMAT_DATA_OUT->dagstream,
1101                                                   FORMAT_DATA_OUT->waiting);
1102                FORMAT_DATA_OUT->waiting = 0;
1103        }
1104
1105        return size + pad + dag_record_size;
1106}
1107
1108/* Attempts to determine a suitable ERF type for a given packet. Returns true
1109 * if one is found, false otherwise */
1110static bool find_compatible_linktype(libtrace_out_t *libtrace,
1111                                     libtrace_packet_t *packet, char *type)
1112{
1113        /* Keep trying to simplify the packet until we can find
1114         * something we can do with it */
1115
1116        do {
1117                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1118
1119                /* Success */
1120                if (*type != (char)-1)
1121                        return true;
1122
1123                if (!demote_packet(packet)) {
1124                        trace_set_err_out(libtrace,
1125                                          TRACE_ERR_NO_CONVERSION,
1126                                          "No erf type for packet (%i)",
1127                                          trace_get_link_type(packet));
1128                        return false;
1129                }
1130
1131        } while(1);
1132
1133        return true;
1134}
1135
1136/* Writes a packet to the provided DAG output trace */
1137static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1138{
1139        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1140         * coding sucks, sorry about that.
1141         */
1142        unsigned int pad = 0;
1143        int numbytes;
1144        void *payload = packet->payload;
1145        dag_record_t *header = (dag_record_t *)packet->header;
1146        char erf_type = 0;
1147
1148        if(!packet->header) {
1149                /* No header, probably an RT packet. Lifted from
1150                 * erf_write_packet(). */
1151                return -1;
1152        }
1153
1154        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1155                return 0;
1156
1157        pad = dag_get_padding(packet);
1158
1159        /*
1160         * If the payload is null, adjust the rlen. Discussion of this is
1161         * attached to erf_write_packet()
1162         */
1163        if (payload == NULL) {
1164                header->rlen = htons(dag_record_size + pad);
1165        }
1166
1167        if (packet->type == TRACE_RT_DATA_ERF) {
1168                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1169        } else {
1170                /* Build up a new packet header from the existing header */
1171
1172                /* Simplify the packet first - if we can't do this, break
1173                 * early */
1174                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1175                        return -1;
1176
1177                dag_record_t erfhdr;
1178
1179                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1180                payload=packet->payload;
1181                pad = dag_get_padding(packet);
1182
1183                /* Flags. Can't do this */
1184                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1185                if (trace_get_direction(packet)!=(int)~0U)
1186                        erfhdr.flags.iface = trace_get_direction(packet);
1187
1188                erfhdr.type = erf_type;
1189
1190                /* Packet length (rlen includes format overhead) */
1191                assert(trace_get_capture_length(packet) > 0
1192                       && trace_get_capture_length(packet) <= 65536);
1193                assert(erf_get_framing_length(packet) > 0
1194                       && trace_get_framing_length(packet) <= 65536);
1195                assert(trace_get_capture_length(packet) +
1196                       erf_get_framing_length(packet) > 0
1197                       && trace_get_capture_length(packet) +
1198                       erf_get_framing_length(packet) <= 65536);
1199
1200                erfhdr.rlen = htons(trace_get_capture_length(packet)
1201                                    + erf_get_framing_length(packet));
1202
1203
1204                /* Loss counter. Can't do this */
1205                erfhdr.lctr = 0;
1206                /* Wire length, does not include padding! */
1207                erfhdr.wlen = htons(trace_get_wire_length(packet));
1208
1209                /* Write it out */
1210                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1211        }
1212
1213        return numbytes;
1214}
1215
1216/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1217 *
1218 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1219 */
1220static int dag_read_packet_stream(libtrace_t *libtrace,
1221                                struct dag_per_stream_t *stream_data,
1222                                libtrace_thread_t *t, /* Optional */
1223                                libtrace_packet_t *packet)
1224{
1225        int size = 0;
1226        dag_record_t *erfptr = NULL;
1227        struct timeval tv;
1228        int numbytes = 0;
1229        uint32_t flags = 0;
1230        struct timeval maxwait, pollwait;
1231
1232        pollwait.tv_sec = 0;
1233        pollwait.tv_usec = 10000;
1234        maxwait.tv_sec = 0;
1235        maxwait.tv_usec = 250000;
1236
1237        /* Check if we're due for a DUCK report - only report on the first thread */
1238        if (stream_data == FORMAT_DATA_FIRST) {
1239                size = dag_get_duckinfo(libtrace, packet);
1240                if (size != 0)
1241                        return size;
1242        }
1243
1244
1245        /* Don't let anyone try to free our DAG memory hole! */
1246        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1247
1248        /* If the packet buffer is currently owned by libtrace, free it so
1249         * that we can set the packet to point into the DAG memory hole */
1250        if (packet->buf_control == TRACE_CTRL_PACKET) {
1251                free(packet->buffer);
1252                packet->buffer = 0;
1253        }
1254
1255        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
1256                                sizeof(dag_record_t), &maxwait,
1257                                &pollwait) == -1) {
1258                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1259                return -1;
1260        }
1261
1262        /* Grab a full ERF record */
1263        do {
1264                numbytes = dag_available(libtrace, stream_data);
1265                if (numbytes < 0)
1266                        return numbytes;
1267                if (numbytes < dag_record_size) {
1268                        /* Check the message queue if we have one to check */
1269                        if (t != NULL &&
1270                            libtrace_message_queue_count(&t->messages) > 0)
1271                                return -2;
1272
1273                        if (libtrace_halt)
1274                                return 0;
1275                        /* Block until we see a packet */
1276                        continue;
1277                }
1278                erfptr = dag_get_record(stream_data);
1279        } while (erfptr == NULL);
1280
1281        packet->trace = libtrace;
1282        /* Prepare the libtrace packet */
1283        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1284                                    TRACE_RT_DATA_ERF, flags))
1285                return -1;
1286
1287        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1288        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1289                tv = trace_get_timeval(packet);
1290                DUCK.last_pkt = tv.tv_sec;
1291        }
1292
1293        packet->error = packet->payload ? htons(erfptr->rlen) :
1294                                          erf_get_framing_length(packet);
1295
1296        return packet->error;
1297}
1298
1299static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1300{
1301        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1302}
1303
1304static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1305                             libtrace_packet_t **packets, size_t nb_packets)
1306{
1307        int ret;
1308        size_t read_packets = 0;
1309        int numbytes = 0;
1310
1311        struct dag_per_stream_t *stream_data =
1312                (struct dag_per_stream_t *)t->format_data;
1313
1314        /* Read as many packets as we can, but read atleast one packet */
1315        do {
1316                ret = dag_read_packet_stream(libtrace, stream_data, t,
1317                                           packets[read_packets]);
1318                if (ret < 0)
1319                        return ret;
1320
1321                read_packets++;
1322
1323                /* Make sure we don't read too many packets..! */
1324                if (read_packets >= nb_packets)
1325                        break;
1326
1327                numbytes = dag_available(libtrace, stream_data);
1328        } while (numbytes >= dag_record_size);
1329
1330        return read_packets;
1331}
1332
1333/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1334 * packet is available, we will return a packet event. Otherwise we will
1335 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1336 */
1337static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1338                                           libtrace_packet_t *packet)
1339{
1340        libtrace_eventobj_t event = {0,0,0.0,0};
1341        dag_record_t *erfptr = NULL;
1342        int numbytes;
1343        uint32_t flags = 0;
1344        struct timeval minwait, tv;
1345       
1346        minwait.tv_sec = 0;
1347        minwait.tv_usec = 10000;
1348
1349        /* Check if we're meant to provide a DUCK update */
1350        numbytes = dag_get_duckinfo(libtrace, packet);
1351        if (numbytes < 0) {
1352                event.type = TRACE_EVENT_TERMINATE;
1353                return event;
1354        } else if (numbytes > 0) {
1355                event.type = TRACE_EVENT_PACKET;
1356                return event;
1357        }
1358       
1359        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
1360                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1361                                &minwait) == -1) {
1362                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1363                event.type = TRACE_EVENT_TERMINATE;
1364                return event;
1365        }
1366
1367        do {
1368                erfptr = NULL;
1369                numbytes = 0;
1370
1371                /* Need to call dag_available so that the top pointer will get
1372                 * updated, otherwise we'll never see any data! */
1373                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1374
1375                /* May as well not bother calling dag_get_record if
1376                 * dag_available suggests that there's no data */
1377                if (numbytes != 0)
1378                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1379                if (erfptr == NULL) {
1380                        /* No packet available - sleep for a very short time */
1381                        if (libtrace_halt) {
1382                                event.type = TRACE_EVENT_TERMINATE;
1383                        } else {
1384                                event.type = TRACE_EVENT_SLEEP;
1385                                event.seconds = 0.0001;
1386                        }
1387                        break;
1388                }
1389                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1390                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1391                        event.type = TRACE_EVENT_TERMINATE;
1392                        break;
1393                }
1394
1395
1396                event.size = trace_get_capture_length(packet) +
1397                        trace_get_framing_length(packet);
1398
1399                /* XXX trace_read_packet() normally applies the following
1400                 * config options for us, but this function is called via
1401                 * trace_event() so we have to do it ourselves */
1402
1403                if (libtrace->filter) {
1404                        int filtret = trace_apply_filter(libtrace->filter,
1405                                                         packet);
1406                        if (filtret == -1) {
1407                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1408                                              "Bad BPF Filter");
1409                                event.type = TRACE_EVENT_TERMINATE;
1410                                break;
1411                        }
1412
1413                        if (filtret == 0) {
1414                                /* This packet isn't useful so we want to
1415                                 * immediately see if there is another suitable
1416                                 * one - we definitely DO NOT want to return
1417                                 * a sleep event in this case, like we used to
1418                                 * do! */
1419                                libtrace->filtered_packets ++;
1420                                trace_clear_cache(packet);
1421                                continue;
1422                        }
1423
1424                        event.type = TRACE_EVENT_PACKET;
1425                } else {
1426                        event.type = TRACE_EVENT_PACKET;
1427                }
1428
1429                /* Update the DUCK timer */
1430                tv = trace_get_timeval(packet);
1431                DUCK.last_pkt = tv.tv_sec;
1432               
1433                if (libtrace->snaplen > 0) {
1434                        trace_set_capture_length(packet, libtrace->snaplen);
1435                }
1436                libtrace->accepted_packets ++;
1437                break;
1438        } while(1);
1439
1440        return event;
1441}
1442
1443static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1444{
1445        libtrace_list_node_t *tmp;
1446        assert(stat && libtrace);
1447        tmp = FORMAT_DATA_HEAD;
1448
1449        /* We don't filter packets on the card itself */
1450        stat->filtered_valid = 1;
1451        stat->filtered = 0;
1452
1453        /* Dropped, filtered the  */
1454        stat->dropped_valid = 1;
1455        stat->dropped = 0;
1456        while (tmp != NULL) {
1457                stat->dropped += STREAM_DATA(tmp)->drops;
1458                tmp = tmp->next;
1459        }
1460
1461}
1462
1463static void dag_get_thread_statisitics(libtrace_t *libtrace, libtrace_thread_t *t,
1464                                       libtrace_stat_t *stat) {
1465        struct dag_per_stream_t *stream_data = t->format_data;
1466        assert(stat && libtrace);
1467
1468        stat->dropped_valid = 1;
1469        stat->dropped = stream_data->drops;
1470
1471        stat->filtered_valid = 1;
1472        stat->filtered = 0;
1473}
1474
1475/* Prints some semi-useful help text about the DAG format module */
1476static void dag_help(void) {
1477        printf("dag format module: $Revision: 1755 $\n");
1478        printf("Supported input URIs:\n");
1479        printf("\tdag:/dev/dagn\n");
1480        printf("\n");
1481        printf("\te.g.: dag:/dev/dag0\n");
1482        printf("\n");
1483        printf("Supported output URIs:\n");
1484        printf("\tnone\n");
1485        printf("\n");
1486}
1487
1488static int dag_pconfig_input(UNUSED libtrace_t *libtrace,
1489                             trace_parallel_option_t option, UNUSED void *value)
1490{
1491        /* We don't support any of these! Normally you configure the DAG card
1492         * externally. */
1493        switch(option) {
1494        case TRACE_OPTION_SET_HASHER:
1495        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1496        case TRACE_OPTION_TRACETIME:
1497        case TRACE_OPTION_TICK_INTERVAL:
1498        case TRACE_OPTION_GET_CONFIG:
1499        case TRACE_OPTION_SET_CONFIG:
1500                return -1;
1501        }
1502        /* We don't provide a default option to ensure that future options will
1503         * generate a compiler warning. */
1504
1505        return -1;
1506}
1507
1508static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1509                                bool reader)
1510{
1511        struct dag_per_stream_t *stream_data;
1512        libtrace_list_node_t *node;
1513
1514        if (reader) {
1515                if (t->type == THREAD_PERPKT) {
1516
1517                        node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1518                                                        t->perpkt_num);
1519                        if (node == NULL) {
1520                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1521                                              "Too few streams supplied for the"
1522                                              " number of threads lanuched");
1523                                return -1;
1524                        }
1525                        stream_data = node->data;
1526
1527                        /* Pass the per thread data to the thread */
1528                        t->format_data = stream_data;
1529
1530                        /* Attach and start the DAG stream */
1531                        printf("t%u: starting and attaching stream #%u\n",
1532                               t->perpkt_num, stream_data->dagstream);
1533                        if (dag_start_input_stream(libtrace, stream_data) < 0)
1534                                return -1;
1535                } else {
1536                        /* TODO: Figure out why t->type != THREAD_PERPKT in
1537                         * order to figure out what this line does */
1538                        t->format_data = FORMAT_DATA_FIRST;
1539                }
1540        }
1541
1542        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
1543
1544        return 0;
1545}
1546
1547static struct libtrace_format_t dag = {
1548        "dag",
1549        "$Id$",
1550        TRACE_FORMAT_ERF,
1551        dag_probe_filename,             /* probe filename */
1552        NULL,                           /* probe magic */
1553        dag_init_input,                 /* init_input */
1554        dag_config_input,               /* config_input */
1555        dag_start_input,                /* start_input */
1556        dag_pause_input,                /* pause_input */
1557        dag_init_output,                /* init_output */
1558        NULL,                           /* config_output */
1559        dag_start_output,               /* start_output */
1560        dag_fin_input,                  /* fin_input */
1561        dag_fin_output,                 /* fin_output */
1562        dag_read_packet,                /* read_packet */
1563        dag_prepare_packet,             /* prepare_packet */
1564        NULL,                           /* fin_packet */
1565        dag_write_packet,               /* write_packet */
1566        erf_get_link_type,              /* get_link_type */
1567        erf_get_direction,              /* get_direction */
1568        erf_set_direction,              /* set_direction */
1569        erf_get_erf_timestamp,          /* get_erf_timestamp */
1570        NULL,                           /* get_timeval */
1571        NULL,                           /* get_seconds */
1572        NULL,                           /* get_timespec */
1573        NULL,                           /* seek_erf */
1574        NULL,                           /* seek_timeval */
1575        NULL,                           /* seek_seconds */
1576        erf_get_capture_length,         /* get_capture_length */
1577        erf_get_wire_length,            /* get_wire_length */
1578        erf_get_framing_length,         /* get_framing_length */
1579        erf_set_capture_length,         /* set_capture_length */
1580        NULL,                           /* get_received_packets */
1581        NULL,                           /* get_filtered_packets */
1582        NULL,                           /* get_dropped_packets */
1583        dag_get_statistics,             /* get_statistics */
1584        NULL,                           /* get_fd */
1585        trace_event_dag,                /* trace_event */
1586        dag_help,                       /* help */
1587        NULL,                            /* next pointer */
1588        {true, 0}, /* live packet capture, thread limit TBD */
1589        dag_pstart_input,
1590        dag_pread_packets,
1591        dag_pause_input,
1592        NULL,
1593        dag_pconfig_input,
1594        dag_pregister_thread,
1595        NULL,
1596        dag_get_thread_statisitics      /* get thread stats */
1597};
1598
1599void dag_constructor(void)
1600{
1601        register_format(&dag);
1602}
Note: See TracBrowser for help on using the repository browser.