source: lib/format_dag25.c @ 4631115

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 4631115 was 4631115, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Merge branch 'master' into develop.

Update to include fixes/features etc from the 3.0.22 release.

Conflicts:

README
lib/format_dag25.c
lib/format_dpdk.c
lib/format_linux.c
lib/trace.c
test/Makefile

  • Property mode set to 100644
File size: 45.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81#define DATA(x) ((struct dag_format_data_t *)x->format_data)
82#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89
90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
92
93static struct libtrace_format_t dag;
94
95/* A DAG device - a DAG device can support multiple streams (and therefore
96 * multiple input traces) so each trace needs to refer to a device */
97struct dag_dev_t {
98        char * dev_name;                /* Device name */
99        int fd;                         /* File descriptor */
100        uint16_t ref_count;             /* Number of input / output traces
101                                           that are using this device */
102        struct dag_dev_t *prev;         /* Pointer to the previous device in
103                                           the device list */
104        struct dag_dev_t *next;         /* Pointer to the next device in the
105                                           device list */
106};
107
108/* "Global" data that is stored for each DAG output trace */
109struct dag_format_data_out_t {
110        /* String containing the DAG device name */
111        char *device_name;
112        /* The DAG device being used for writing */
113        struct dag_dev_t *device;
114        /* The DAG stream that is being written on */
115        unsigned int dagstream;
116        /* Boolean flag indicating whether the stream is currently attached */
117        int stream_attached;
118        /* The amount of data waiting to be transmitted, in bytes */
119        uint64_t waiting;
120        /* A buffer to hold the data to be transmittted */
121        uint8_t *txbuffer;
122};
123
124/* Data that is stored against each input stream */
125struct dag_per_stream_t {
126        /* DAG device */
127        struct dag_dev_t *device;
128        /* DAG stream number */
129        uint16_t dagstream;
130        /* Pointer to the last unread byte in the DAG memory */
131        uint8_t *top;
132        /* Pointer to the first unread byte in the DAG memory */
133        uint8_t *bottom;
134        /* Amount of data processed from the bottom pointer */
135        uint32_t processed;
136        /* Number of packets seen by the stream */
137        uint64_t pkt_count;
138        /* Drop count for this particular stream */
139        uint64_t drops;
140        /* Boolean values to indicate if a particular interface has been seen
141         * or not. This is limited to four interfaces, which is enough to
142         * support all current DAG cards */
143        uint8_t seeninterface[4];
144};
145
146/* "Global" data that is stored for each DAG input trace */
147struct dag_format_data_t {
148        /* Data required for regular DUCK reporting */
149        /* TODO: This doesn't work with the 10X2S card! I don't know how
150         * DUCK stuff works and don't know how to fix it */
151        struct {
152                /* Timestamp of the last DUCK report */
153                uint32_t last_duck;
154                /* The number of seconds between each DUCK report */
155                uint32_t duck_freq;
156                /* Timestamp of the last packet read from the DAG card */
157                uint32_t last_pkt;
158                /* Dummy trace to ensure DUCK packets are dealt with using the
159                 * DUCK format functions */
160                libtrace_t *dummy_duck;
161        } duck;
162
163        /* String containing the DAG device name */
164        char *device_name;
165        /* Boolean flag indicating whether the trace is currently attached */
166        int stream_attached;
167
168        /* Data stored against each DAG input stream */
169        libtrace_list_t *per_stream;
170};
171
172/* To be thread-safe, we're going to need a mutex for operating on the list
173 * of DAG devices */
174pthread_mutex_t open_dag_mutex;
175
176/* The list of DAG devices that have been opened by libtrace.
177 *
178 * We can only open each DAG device once, but we might want to read from
179 * multiple streams. Therefore, we need to maintain a list of devices that we
180 * have opened (with ref counts!) so that we don't try to open a device too
181 * many times or close a device that we're still using */
182struct dag_dev_t *open_dags = NULL;
183
184/* Returns the amount of padding between the ERF header and the start of the
185 * captured packet data */
186static int dag_get_padding(const libtrace_packet_t *packet)
187{
188        /* ERF Ethernet records have a 2 byte padding before the packet itself
189         * so that the IP header is aligned on a 32 bit boundary.
190         */
191        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
192                dag_record_t *erfptr = (dag_record_t *)packet->header;
193                switch(erfptr->type) {
194                        case TYPE_ETH:
195                        case TYPE_DSM_COLOR_ETH:
196                                return 2;
197                        default:                return 0;
198                }
199        }
200        else {
201                switch(trace_get_link_type(packet)) {
202                        case TRACE_TYPE_ETH:    return 2;
203                        default:                return 0;
204                }
205        }
206}
207
208/* Attempts to determine if the given filename refers to a DAG device */
209static int dag_probe_filename(const char *filename)
210{
211        struct stat statbuf;
212        /* Can we stat the file? */
213        if (stat(filename, &statbuf) != 0) {
214                return 0;
215        }
216        /* Is it a character device? */
217        if (!S_ISCHR(statbuf.st_mode)) {
218                return 0;
219        }
220        /* Yeah, it's probably us. */
221        return 1;
222}
223
224/* Initialises the DAG output data structure */
225static void dag_init_format_out_data(libtrace_out_t *libtrace)
226{
227        libtrace->format_data = (struct dag_format_data_out_t *)
228                malloc(sizeof(struct dag_format_data_out_t));
229        // no DUCK on output
230        FORMAT_DATA_OUT->stream_attached = 0;
231        FORMAT_DATA_OUT->device = NULL;
232        FORMAT_DATA_OUT->device_name = NULL;
233        FORMAT_DATA_OUT->dagstream = 0;
234        FORMAT_DATA_OUT->waiting = 0;
235
236}
237
238/* Initialises the DAG input data structure */
239static void dag_init_format_data(libtrace_t *libtrace)
240{
241        struct dag_per_stream_t stream_data;
242
243        libtrace->format_data = (struct dag_format_data_t *)
244                malloc(sizeof(struct dag_format_data_t));
245        DUCK.last_duck = 0;
246        DUCK.duck_freq = 0;
247        DUCK.last_pkt = 0;
248        DUCK.dummy_duck = NULL;
249
250        FORMAT_DATA->per_stream =
251                libtrace_list_init(sizeof(stream_data));
252        assert(FORMAT_DATA->per_stream != NULL);
253
254        /* We'll start with just one instance of stream_data, and we'll
255         * add more later if we need them */
256        memset(&stream_data, 0, sizeof(stream_data));
257        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
258        FORMAT_DATA->device_name = NULL;
259}
260
261/* Determines if there is already an entry for the given DAG device in the
262 * device list and increments the reference count for that device, if found.
263 *
264 * NOTE: This function assumes the open_dag_mutex is held by the caller */
265static struct dag_dev_t *dag_find_open_device(char *dev_name)
266{
267        struct dag_dev_t *dag_dev;
268
269        dag_dev = open_dags;
270
271        /* XXX: Not exactly zippy, but how often are we going to be dealing
272         * with multiple dag cards? */
273        while (dag_dev != NULL) {
274                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
275                        dag_dev->ref_count ++;
276                        return dag_dev;
277                }
278                dag_dev = dag_dev->next;
279        }
280        return NULL;
281}
282
283/* Closes a DAG device and removes it from the device list.
284 *
285 * Attempting to close a DAG device that has a non-zero reference count will
286 * cause an assertion failure!
287 *
288 * NOTE: This function assumes the open_dag_mutex is held by the caller */
289static void dag_close_device(struct dag_dev_t *dev)
290{
291        /* Need to remove from the device list */
292        assert(dev->ref_count == 0);
293
294        if (dev->prev == NULL) {
295                open_dags = dev->next;
296                if (dev->next)
297                        dev->next->prev = NULL;
298        } else {
299                dev->prev->next = dev->next;
300                if (dev->next)
301                        dev->next->prev = dev->prev;
302        }
303
304        dag_close(dev->fd);
305        if (dev->dev_name)
306                free(dev->dev_name);
307        free(dev);
308}
309
310
311/* Opens a new DAG device for writing and adds it to the DAG device list
312 *
313 * NOTE: this function should only be called when opening a DAG device for
314 * writing - there is little practical difference between this and the
315 * function below that covers the reading case, but we need the output trace
316 * object to report errors properly so the two functions take slightly
317 * different arguments. This is really lame and there should be a much better
318 * way of doing this.
319 *
320 * NOTE: This function assumes the open_dag_mutex is held by the caller
321 */
322static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
323                                                char *dev_name)
324{
325        struct stat buf;
326        int fd;
327        struct dag_dev_t *new_dev;
328
329        /* Make sure the device exists */
330        if (stat(dev_name, &buf) == -1) {
331                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
332                return NULL;
333        }
334
335        /* Make sure it is the appropriate type of device */
336        if (S_ISCHR(buf.st_mode)) {
337                /* Try opening the DAG device */
338                if((fd = dag_open(dev_name)) < 0) {
339                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
340                                        dev_name);
341                        return NULL;
342                }
343        } else {
344                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
345                                dev_name);
346                return NULL;
347        }
348
349        /* Add the device to our device list - it is just a doubly linked
350         * list with no inherent ordering; just tack the new one on the front
351         */
352        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
353        new_dev->fd = fd;
354        new_dev->dev_name = dev_name;
355        new_dev->ref_count = 1;
356
357        new_dev->prev = NULL;
358        new_dev->next = open_dags;
359        if (open_dags)
360                open_dags->prev = new_dev;
361
362        open_dags = new_dev;
363
364        return new_dev;
365}
366
367/* Opens a new DAG device for reading and adds it to the DAG device list
368 *
369 * NOTE: this function should only be called when opening a DAG device for
370 * reading - there is little practical difference between this and the
371 * function above that covers the writing case, but we need the input trace
372 * object to report errors properly so the two functions take slightly
373 * different arguments. This is really lame and there should be a much better
374 * way of doing this.
375 *
376 * NOTE: This function assumes the open_dag_mutex is held by the caller */
377static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
378        struct stat buf;
379        int fd;
380        struct dag_dev_t *new_dev;
381
382        /* Make sure the device exists */
383        if (stat(dev_name, &buf) == -1) {
384                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
385                return NULL;
386        }
387
388        /* Make sure it is the appropriate type of device */
389        if (S_ISCHR(buf.st_mode)) {
390                /* Try opening the DAG device */
391                if((fd = dag_open(dev_name)) < 0) {
392                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
393                                      dev_name);
394                        return NULL;
395                }
396        } else {
397                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
398                              dev_name);
399                return NULL;
400        }
401
402        /* Add the device to our device list - it is just a doubly linked
403         * list with no inherent ordering; just tack the new one on the front
404         */
405        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
406        new_dev->fd = fd;
407        new_dev->dev_name = dev_name;
408        new_dev->ref_count = 1;
409
410        new_dev->prev = NULL;
411        new_dev->next = open_dags;
412        if (open_dags)
413                open_dags->prev = new_dev;
414
415        open_dags = new_dev;
416
417        return new_dev;
418}
419
420/* Creates and initialises a DAG output trace */
421static int dag_init_output(libtrace_out_t *libtrace)
422{
423        char *scan = NULL;
424        struct dag_dev_t *dag_device = NULL;
425        int stream = 1;
426
427        /* XXX I don't know if this is important or not, but this function
428         * isn't present in all of the driver releases that this code is
429         * supposed to support! */
430        /*
431        unsigned long wake_time;
432        dagutil_sleep_get_wake_time(&wake_time,0);
433        */
434
435        dag_init_format_out_data(libtrace);
436        /* Grab the mutex while we're likely to be messing with the device
437         * list */
438        pthread_mutex_lock(&open_dag_mutex);
439
440        /* Specific streams are signified using a comma in the libtrace URI,
441         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
442         *
443         * If no stream is specified, we will write using stream 1 */
444        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
445                FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);
446        } else {
447                FORMAT_DATA_OUT->device_name = 
448                                (char *)strndup(libtrace->uridata,
449                                (size_t)(scan - libtrace->uridata));
450                stream = atoi(++scan);
451        }
452        FORMAT_DATA_OUT->dagstream = stream;
453
454        /* See if our DAG device is already open */
455        dag_device = dag_find_open_device(FORMAT_DATA_OUT->device_name);
456
457        if (dag_device == NULL) {
458                /* Device not yet opened - open it ourselves */
459                dag_device = dag_open_output_device(libtrace, 
460                                FORMAT_DATA_OUT->device_name);
461        }
462
463        /* Make sure we have successfully opened a DAG device */
464        if (dag_device == NULL) {
465                if (FORMAT_DATA_OUT->device_name) {
466                        free(FORMAT_DATA_OUT->device_name);
467                        FORMAT_DATA_OUT->device_name = NULL;
468                }
469                pthread_mutex_unlock(&open_dag_mutex);
470                return -1;
471        }
472
473        FORMAT_DATA_OUT->device = dag_device;
474        pthread_mutex_unlock(&open_dag_mutex);
475        return 0;
476}
477
478/* Creates and initialises a DAG input trace */
479static int dag_init_input(libtrace_t *libtrace) {
480        char *scan = NULL;
481        int stream = 0;
482        struct dag_dev_t *dag_device = NULL;
483
484        dag_init_format_data(libtrace);
485        /* Grab the mutex while we're likely to be messing with the device
486         * list */
487        pthread_mutex_lock(&open_dag_mutex);
488
489
490        /* DAG cards support multiple streams. In a single threaded capture,
491         * these are specified using a comma in the libtrace URI,
492         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
493         *
494         * If no stream is specified, we will read from stream 0 with
495         * one thread
496         */
497        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
498                FORMAT_DATA->device_name = strdup(libtrace->uridata);
499        } else {
500                FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,
501                                (size_t)(scan - libtrace->uridata));
502                stream = atoi(++scan);
503        }
504
505        FORMAT_DATA_FIRST->dagstream = stream;
506
507        /* See if our DAG device is already open */
508        dag_device = dag_find_open_device(FORMAT_DATA->device_name);
509
510        if (dag_device == NULL) {
511                /* Device not yet opened - open it ourselves */
512                dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name);
513        }
514
515        /* Make sure we have successfully opened a DAG device */
516        if (dag_device == NULL) {
517                if (FORMAT_DATA->device_name)
518                        free(FORMAT_DATA->device_name);
519                FORMAT_DATA->device_name = NULL;
520                pthread_mutex_unlock(&open_dag_mutex);
521                return -1;
522        }
523
524        FORMAT_DATA_FIRST->device = dag_device;
525
526        /* See Config_Status_API_Programming_Guide.pdf from the Endace
527           Dag Documentation */
528        /* Check kBooleanAttributeActive is true -- no point capturing
529         * on an interface that's disabled
530         *
531         * The symptom of the port being disabled is that libtrace
532         * will appear to hang. */
533        /* Check kBooleanAttributeFault is false */
534        /* Check kBooleanAttributeLocalFault is false */
535        /* Check kBooleanAttributeLock is true ? */
536        /* Check kBooleanAttributePeerLink ? */
537
538        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
539           on libtrace promisc attribute?*/
540        /* Set kUint32AttributeSnapLength to the snaplength */
541
542        pthread_mutex_unlock(&open_dag_mutex);
543        return 0;
544}
545
546/* Configures a DAG input trace */
547static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
548                            void *data)
549{
550        char conf_str[4096];
551        switch(option) {
552        case TRACE_OPTION_META_FREQ:
553                /* This option is used to specify the frequency of DUCK
554                 * updates */
555                DUCK.duck_freq = *(int *)data;
556                return 0;
557        case TRACE_OPTION_SNAPLEN:
558                /* Tell the card our new snap length */
559                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
560                if (dag_configure(FORMAT_DATA_FIRST->device->fd,
561                                  conf_str) != 0) {
562                        trace_set_err(libtrace, errno, "Failed to configure "
563                                      "snaplen on DAG card: %s",
564                                      libtrace->uridata);
565                        return -1;
566                }
567                return 0;
568        case TRACE_OPTION_PROMISC:
569                /* DAG already operates in a promisc fashion */
570                return -1;
571        case TRACE_OPTION_FILTER:
572                /* We don't yet support pushing filters into DAG
573                 * cards */
574                return -1;
575        case TRACE_OPTION_EVENT_REALTIME:
576                /* Live capture is always going to be realtime */
577                return -1;
578        }
579        return -1;
580}
581
582/* Starts a DAG output trace */
583static int dag_start_output(libtrace_out_t *libtrace)
584{
585        struct timeval zero, nopoll;
586
587        zero.tv_sec = 0;
588        zero.tv_usec = 0;
589        nopoll = zero;
590
591        /* Attach and start the DAG stream */
592        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
593                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
594                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
595                return -1;
596        }
597
598        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
599                        FORMAT_DATA_OUT->dagstream) < 0) {
600                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
601                return -1;
602        }
603        FORMAT_DATA_OUT->stream_attached = 1;
604
605        /* We don't want the dag card to do any sleeping */
606        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
607                        FORMAT_DATA_OUT->dagstream, 0, &zero,
608                        &nopoll);
609
610        return 0;
611}
612
613/* Starts a DAG input trace */
614static int dag_start_input(libtrace_t *libtrace)
615{
616        struct timeval zero, nopoll;
617        uint8_t *top, *bottom, *starttop;
618        top = bottom = NULL;
619
620        zero.tv_sec = 0;
621        zero.tv_usec = 10000;
622        nopoll = zero;
623
624        /* Attach and start the DAG stream */
625        if (dag_attach_stream(FORMAT_DATA_FIRST->device->fd,
626                              FORMAT_DATA_FIRST->dagstream, 0, 0) < 0) {
627                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
628                return -1;
629        }
630
631        if (dag_start_stream(FORMAT_DATA_FIRST->device->fd,
632                             FORMAT_DATA_FIRST->dagstream) < 0) {
633                trace_set_err(libtrace, errno, "Cannot start DAG stream");
634                return -1;
635        }
636        FORMAT_DATA->stream_attached = 1;
637
638        /* We don't want the dag card to do any sleeping */
639        dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd,
640                            FORMAT_DATA_FIRST->dagstream, 0, &zero,
641                            &nopoll);
642
643        starttop = dag_advance_stream(FORMAT_DATA_FIRST->device->fd,
644                                      FORMAT_DATA_FIRST->dagstream,
645                                      &bottom);
646
647        /* Should probably flush the memory hole now */
648        top = starttop;
649        while (starttop - bottom > 0) {
650                bottom += (starttop - bottom);
651                top = dag_advance_stream(FORMAT_DATA_FIRST->device->fd,
652                                         FORMAT_DATA_FIRST->dagstream,
653                                         &bottom);
654        }
655        FORMAT_DATA_FIRST->top = top;
656        FORMAT_DATA_FIRST->bottom = bottom;
657        FORMAT_DATA_FIRST->processed = 0;
658        FORMAT_DATA_FIRST->drops = 0;
659
660        return 0;
661}
662
663static int dag_pstart_input(libtrace_t *libtrace)
664{
665        char *scan, *tok;
666        uint16_t stream_count = 0, max_streams;
667        int iserror = 0;
668        struct dag_per_stream_t stream_data;
669
670        /* Check we aren't trying to create more threads than the DAG card can
671         * handle */
672        max_streams = dag_rx_get_stream_count(FORMAT_DATA_FIRST->device->fd);
673        if (libtrace->perpkt_thread_count > max_streams) {
674                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
675                              "trying to create too many threads (max is %u)",
676                              max_streams);
677                iserror = 1;
678                goto cleanup;
679        }
680
681        /* Get the stream names from the uri */
682        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
683                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
684                              "format uri doesn't specify the DAG streams");
685                iserror = 1;
686                goto cleanup;
687        }
688
689        scan++;
690
691        tok = strtok(scan, ",");
692        while (tok != NULL) {
693                /* Ensure we haven't specified too many streams */
694                if (stream_count >= libtrace->perpkt_thread_count) {
695                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
696                                      "format uri specifies too many streams. "
697                                      "Max is %u", max_streams);
698                        iserror = 1;
699                        goto cleanup;
700                }
701
702                /* Save the stream details */
703                if (stream_count == 0) {
704                        /* Special case where we update the existing stream
705                         * data structure */
706                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
707                } else {
708                        memset(&stream_data, 0, sizeof(stream_data));
709                        stream_data.device = FORMAT_DATA_FIRST->device;
710                        stream_data.dagstream = (uint16_t)atoi(tok);
711                        libtrace_list_push_back(FORMAT_DATA->per_stream,
712                                                &stream_data);
713                }
714
715                stream_count++;
716                tok = strtok(NULL, ",");
717        }
718
719        FORMAT_DATA->stream_attached = 1;
720
721 cleanup:
722        if (iserror) {
723                return -1;
724        } else {
725                return 0;
726        }
727}
728
729/* Pauses a DAG output trace */
730static int dag_pause_output(libtrace_out_t *libtrace)
731{
732        /* Stop and detach the stream */
733        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
734                            FORMAT_DATA_OUT->dagstream) < 0) {
735                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
736                return -1;
737        }
738        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
739                              FORMAT_DATA_OUT->dagstream) < 0) {
740                trace_set_err_out(libtrace, errno,
741                                  "Could not detach DAG stream");
742                return -1;
743        }
744        FORMAT_DATA_OUT->stream_attached = 0;
745        return 0;
746}
747
748/* Pauses a DAG input trace */
749static int dag_pause_input(libtrace_t *libtrace)
750{
751        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
752
753        /* Stop and detach each stream */
754        while (tmp != NULL) {
755                if (dag_stop_stream(STREAM_DATA(tmp)->device->fd,
756                                    STREAM_DATA(tmp)->dagstream) < 0) {
757                        trace_set_err(libtrace, errno,
758                                      "Could not stop DAG stream");
759                        printf("Count not stop DAG stream\n");
760                        return -1;
761                }
762                if (dag_detach_stream(STREAM_DATA(tmp)->device->fd,
763                                      STREAM_DATA(tmp)->dagstream) < 0) {
764                        trace_set_err(libtrace, errno,
765                                      "Could not detach DAG stream");
766                        printf("Count not detach DAG stream\n");
767                        return -1;
768                }
769
770                tmp = tmp->next;
771        }
772
773        FORMAT_DATA->stream_attached = 0;
774        return 0;
775}
776
777
778
779/* Closes a DAG input trace */
780static int dag_fin_input(libtrace_t *libtrace)
781{
782        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
783
784        /* Need the lock, since we're going to be handling the device list */
785        pthread_mutex_lock(&open_dag_mutex);
786
787        /* Detach the stream if we are not paused */
788        if (FORMAT_DATA->stream_attached)
789                dag_pause_input(libtrace);
790
791        /* Close any dag devices that have no more references */
792        while (tmp != NULL) {
793                STREAM_DATA(tmp)->device->ref_count--;
794                if (STREAM_DATA(tmp)->device->ref_count == 0)
795                        dag_close_device(STREAM_DATA(tmp)->device);
796
797                tmp = tmp->next;
798        }
799
800        if (DUCK.dummy_duck)
801                trace_destroy_dead(DUCK.dummy_duck);
802
803        /* Clear the list */
804        libtrace_list_deinit(FORMAT_DATA->per_stream);
805
806        if (FORMAT_DATA->device_name)
807                free(FORMAT_DATA->device_name);
808        free(libtrace->format_data);
809        pthread_mutex_unlock(&open_dag_mutex);
810        return 0; /* success */
811}
812
813/* Closes a DAG output trace */
814static int dag_fin_output(libtrace_out_t *libtrace)
815{
816
817        /* Commit any outstanding traffic in the txbuffer */
818        if (FORMAT_DATA_OUT->waiting) {
819                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
820                                           FORMAT_DATA_OUT->dagstream,
821                                           FORMAT_DATA_OUT->waiting );
822        }
823
824        /* Wait until the buffer is nearly clear before exiting the program,
825         * as we will lose packets otherwise */
826        dag_tx_get_stream_space
827                (FORMAT_DATA_OUT->device->fd,
828                 FORMAT_DATA_OUT->dagstream,
829                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
830                                            FORMAT_DATA_OUT->dagstream) - 8);
831
832        /* Need the lock, since we're going to be handling the device list */
833        pthread_mutex_lock(&open_dag_mutex);
834
835        /* Detach the stream if we are not paused */
836        if (FORMAT_DATA_OUT->stream_attached)
837                dag_pause_output(libtrace);
838        FORMAT_DATA_OUT->device->ref_count --;
839
840        /* Close the DAG device if there are no more references to it */
841        if (FORMAT_DATA_OUT->device->ref_count == 0)
842                dag_close_device(FORMAT_DATA_OUT->device);
843        if (FORMAT_DATA_OUT->device_name)
844                free(FORMAT_DATA_OUT->device_name);
845        free(libtrace->format_data);
846        pthread_mutex_unlock(&open_dag_mutex);
847        return 0; /* success */
848}
849
850#ifdef DAGIOC_CARD_DUCK
851#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
852#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
853#else
854#ifdef DAGIOCDUCK
855#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
856#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
857#else
858#warning "DAG appears to be missing DUCK support"
859#endif
860#endif
861
862/* Extracts DUCK information from the DAG card and produces a DUCK packet */
863static int dag_get_duckinfo(libtrace_t *libtrace,
864                                libtrace_packet_t *packet) {
865
866        if (DUCK.duck_freq == 0)
867                return 0;
868
869#ifndef LIBTRACE_DUCK_IOCTL
870        trace_set_err(libtrace, errno, 
871                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
872        DUCK.duck_freq = 0;
873        return -1;
874#endif
875
876        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
877                return 0;
878
879        /* Allocate memory for the DUCK data */
880        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
881            !packet->buffer) {
882                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
883                packet->buf_control = TRACE_CTRL_PACKET;
884                if (!packet->buffer) {
885                        trace_set_err(libtrace, errno,
886                                      "Cannot allocate packet buffer");
887                        return -1;
888                }
889        }
890
891        /* DUCK doesn't have a format header */
892        packet->header = 0;
893        packet->payload = packet->buffer;
894
895        /* No need to check if we can get DUCK or not - we're modern
896         * enough so just grab the DUCK info */
897        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
898                                        (duckinf_t *)packet->payload) < 0)) {
899                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
900                DUCK.duck_freq = 0;
901                return -1;
902        }
903
904        packet->type = LIBTRACE_DUCK_VERSION;
905
906        /* Set the packet's trace to point at a DUCK trace, so that the
907         * DUCK format functions will be called on the packet rather than the
908         * DAG ones */
909        if (!DUCK.dummy_duck)
910                DUCK.dummy_duck = trace_create_dead("duck:dummy");
911        packet->trace = DUCK.dummy_duck;
912        DUCK.last_duck = DUCK.last_pkt;
913        return sizeof(duckinf_t);
914}
915
916/* Determines the amount of data available to read from the DAG card */
917static int dag_available(libtrace_t *libtrace,
918                         struct dag_per_stream_t *stream_data)
919{
920        uint32_t diff = stream_data->top - stream_data->bottom;
921
922        /* If we've processed more than 4MB of data since we last called
923         * dag_advance_stream, then we should call it again to allow the
924         * space occupied by that 4MB to be released */
925        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
926                return diff;
927
928        /* Update the top and bottom pointers */
929        stream_data->top = dag_advance_stream(stream_data->device->fd,
930                                              stream_data->dagstream,
931                                              &(stream_data->bottom));
932
933        if (stream_data->top == NULL) {
934                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
935                return -1;
936        }
937        stream_data->processed = 0;
938        diff = stream_data->top - stream_data->bottom;
939        return diff;
940}
941
942/* Returns a pointer to the start of the next complete ERF record */
943static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
944{
945        dag_record_t *erfptr = NULL;
946        uint16_t size;
947
948        erfptr = (dag_record_t *)stream_data->bottom;
949        if (!erfptr)
950                return NULL;
951
952        size = ntohs(erfptr->rlen);
953        assert( size >= dag_record_size );
954
955        /* Make certain we have the full packet available */
956        if (size > (stream_data->top - stream_data->bottom))
957                return NULL;
958
959        stream_data->bottom += size;
960        stream_data->processed += size;
961        return erfptr;
962}
963
964/* Converts a buffer containing a recently read DAG packet record into a
965 * libtrace packet */
966static int dag_prepare_packet_real(libtrace_t *libtrace,
967                                   struct dag_per_stream_t *stream_data,
968                                   libtrace_packet_t *packet,
969                                   void *buffer, libtrace_rt_types_t rt_type,
970                                   uint32_t flags)
971{
972        dag_record_t *erfptr;
973
974        /* If the packet previously owned a buffer that is not the buffer
975         * that contains the new packet data, we're going to need to free the
976         * old one to avoid memory leaks */
977        if (packet->buffer != buffer &&
978            packet->buf_control == TRACE_CTRL_PACKET) {
979                free(packet->buffer);
980        }
981
982        /* Set the buffer owner appropriately */
983        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
984                packet->buf_control = TRACE_CTRL_PACKET;
985        } else
986                packet->buf_control = TRACE_CTRL_EXTERNAL;
987
988        /* Update the packet pointers and type appropriately */
989        erfptr = (dag_record_t *)buffer;
990        packet->buffer = erfptr;
991        packet->header = erfptr;
992        packet->type = rt_type;
993
994        if (erfptr->flags.rxerror == 1) {
995                /* rxerror means the payload is corrupt - drop the payload
996                 * by tweaking rlen */
997                packet->payload = NULL;
998                erfptr->rlen = htons(erf_get_framing_length(packet));
999        } else {
1000                packet->payload = (char*)packet->buffer
1001                        + erf_get_framing_length(packet);
1002        }
1003
1004        if (libtrace->format_data == NULL) {
1005                dag_init_format_data(libtrace);
1006        }
1007
1008        /* Update the dropped packets counter */
1009        /* No loss counter for DSM coloured records - have to use some
1010         * other API */
1011        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
1012                /* TODO */
1013        } else {
1014                /* Use the ERF loss counter */
1015                if (stream_data->seeninterface[erfptr->flags.iface]
1016                    == 0) {
1017                        stream_data->seeninterface[erfptr->flags.iface]
1018                                = 1;
1019                } else {
1020                        stream_data->drops += ntohs(erfptr->lctr);
1021                }
1022        }
1023
1024        packet->error = 1;
1025
1026        return 0;
1027}
1028
1029static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1030                              void *buffer, libtrace_rt_types_t rt_type,
1031                              uint32_t flags)
1032{
1033        return dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
1034                                       buffer, rt_type, flags);
1035}
1036
1037/*
1038 * dag_write_packet() at this stage attempts to improve tx performance
1039 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1040 * has been met. I observed approximately 270% performance increase
1041 * through this relatively naive tweak. No optimisation of buffer sizes
1042 * was attempted.
1043 */
1044
1045/* Pushes an ERF record onto the transmit stream */
1046static int dag_dump_packet(libtrace_out_t *libtrace,
1047                           dag_record_t *erfptr, unsigned int pad,
1048                           void *buffer)
1049{
1050        int size;
1051
1052        /*
1053         * If we've got 0 bytes waiting in the txqueue, assume that we
1054         * haven't requested any space yet, and request some, storing
1055         * the pointer at FORMAT_DATA_OUT->txbuffer.
1056         *
1057         * The amount to request is slightly magical at the moment - it's
1058         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1059         * the buffer and handle overruns.
1060         */
1061        if (FORMAT_DATA_OUT->waiting == 0) {
1062                FORMAT_DATA_OUT->txbuffer =
1063                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1064                                                FORMAT_DATA_OUT->dagstream,
1065                                                16908288);
1066        }
1067
1068        /*
1069         * Copy the header separately to the body, as we can't guarantee they
1070         * are in contiguous memory
1071         */
1072        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1073               (dag_record_size + pad));
1074        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1075
1076        /*
1077         * Copy our incoming packet into the outgoing buffer, and increment
1078         * our waiting count
1079         */
1080        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1081        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1082               size);
1083        FORMAT_DATA_OUT->waiting += size;
1084
1085        /*
1086         * If our output buffer has more than 16 Mebibytes in it, commit those
1087         * bytes and reset the waiting count to 0.
1088         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1089         * case there is still data in the buffer at program exit.
1090         */
1091        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1092                FORMAT_DATA_OUT->txbuffer =
1093                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1094                                                   FORMAT_DATA_OUT->dagstream,
1095                                                   FORMAT_DATA_OUT->waiting);
1096                FORMAT_DATA_OUT->waiting = 0;
1097        }
1098
1099        return size + pad + dag_record_size;
1100}
1101
1102/* Attempts to determine a suitable ERF type for a given packet. Returns true
1103 * if one is found, false otherwise */
1104static bool find_compatible_linktype(libtrace_out_t *libtrace,
1105                                     libtrace_packet_t *packet, char *type)
1106{
1107        /* Keep trying to simplify the packet until we can find
1108         * something we can do with it */
1109
1110        do {
1111                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1112
1113                /* Success */
1114                if (*type != (char)-1)
1115                        return true;
1116
1117                if (!demote_packet(packet)) {
1118                        trace_set_err_out(libtrace,
1119                                          TRACE_ERR_NO_CONVERSION,
1120                                          "No erf type for packet (%i)",
1121                                          trace_get_link_type(packet));
1122                        return false;
1123                }
1124
1125        } while(1);
1126
1127        return true;
1128}
1129
1130/* Writes a packet to the provided DAG output trace */
1131static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1132{
1133        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1134         * coding sucks, sorry about that.
1135         */
1136        unsigned int pad = 0;
1137        int numbytes;
1138        void *payload = packet->payload;
1139        dag_record_t *header = (dag_record_t *)packet->header;
1140        char erf_type = 0;
1141
1142        if(!packet->header) {
1143                /* No header, probably an RT packet. Lifted from
1144                 * erf_write_packet(). */
1145                return -1;
1146        }
1147
1148        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1149                return 0;
1150
1151        pad = dag_get_padding(packet);
1152
1153        /*
1154         * If the payload is null, adjust the rlen. Discussion of this is
1155         * attached to erf_write_packet()
1156         */
1157        if (payload == NULL) {
1158                header->rlen = htons(dag_record_size + pad);
1159        }
1160
1161        if (packet->type == TRACE_RT_DATA_ERF) {
1162                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1163        } else {
1164                /* Build up a new packet header from the existing header */
1165
1166                /* Simplify the packet first - if we can't do this, break
1167                 * early */
1168                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1169                        return -1;
1170
1171                dag_record_t erfhdr;
1172
1173                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1174                payload=packet->payload;
1175                pad = dag_get_padding(packet);
1176
1177                /* Flags. Can't do this */
1178                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1179                if (trace_get_direction(packet)!=(int)~0U)
1180                        erfhdr.flags.iface = trace_get_direction(packet);
1181
1182                erfhdr.type = erf_type;
1183
1184                /* Packet length (rlen includes format overhead) */
1185                assert(trace_get_capture_length(packet) > 0
1186                       && trace_get_capture_length(packet) <= 65536);
1187                assert(erf_get_framing_length(packet) > 0
1188                       && trace_get_framing_length(packet) <= 65536);
1189                assert(trace_get_capture_length(packet) +
1190                       erf_get_framing_length(packet) > 0
1191                       && trace_get_capture_length(packet) +
1192                       erf_get_framing_length(packet) <= 65536);
1193
1194                erfhdr.rlen = htons(trace_get_capture_length(packet)
1195                                    + erf_get_framing_length(packet));
1196
1197
1198                /* Loss counter. Can't do this */
1199                erfhdr.lctr = 0;
1200                /* Wire length, does not include padding! */
1201                erfhdr.wlen = htons(trace_get_wire_length(packet));
1202
1203                /* Write it out */
1204                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1205        }
1206
1207        return numbytes;
1208}
1209
1210/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1211 *
1212 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1213 */
1214static int dag_read_packet_real(libtrace_t *libtrace,
1215                                struct dag_per_stream_t *stream_data,
1216                                libtrace_thread_t *t, /* Optional */
1217                                libtrace_packet_t *packet)
1218{
1219        dag_record_t *erfptr = NULL;
1220        int numbytes = 0;
1221        uint32_t flags = 0;
1222        struct timeval maxwait, pollwait;
1223
1224        pollwait.tv_sec = 0;
1225        pollwait.tv_usec = 10000;
1226        maxwait.tv_sec = 0;
1227        maxwait.tv_usec = 250000;
1228
1229        /* Check if we're due for a DUCK report */
1230        size = dag_get_duckinfo(libtrace, packet);
1231
1232        if (size != 0)
1233                return size;
1234
1235        /* Don't let anyone try to free our DAG memory hole! */
1236        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1237
1238        /* If the packet buffer is currently owned by libtrace, free it so
1239         * that we can set the packet to point into the DAG memory hole */
1240        if (packet->buf_control == TRACE_CTRL_PACKET) {
1241                free(packet->buffer);
1242                packet->buffer = 0;
1243        }
1244
1245        if (dag_set_stream_poll(stream_data->device->fd, stream_data->dagstream,
1246                                sizeof(dag_record_t), &maxwait,
1247                                &pollwait) == -1) {
1248                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1249                return -1;
1250        }
1251
1252        /* Grab a full ERF record */
1253        do {
1254                numbytes = dag_available(libtrace, stream_data);
1255                if (numbytes < 0)
1256                        return numbytes;
1257                if (numbytes < dag_record_size) {
1258                        /* Check the message queue if we have one to check */
1259                        if (t != NULL &&
1260                            libtrace_message_queue_count(&t->messages) > 0)
1261                                return -2;
1262
1263                        if (libtrace_halt)
1264                                return 0;
1265                        /* Block until we see a packet */
1266                        continue;
1267                }
1268                erfptr = dag_get_record(stream_data);
1269        } while (erfptr == NULL);
1270
1271        /* Prepare the libtrace packet */
1272        if (dag_prepare_packet_real(libtrace, stream_data, packet, erfptr,
1273                                    TRACE_RT_DATA_ERF, flags))
1274                return -1;
1275
1276        return packet->payload ? htons(erfptr->rlen) :
1277                erf_get_framing_length(packet);
1278}
1279
1280static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1281{
1282        return dag_read_packet_real(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1283}
1284
1285static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1286                             libtrace_packet_t **packets, size_t nb_packets)
1287{
1288        int ret;
1289        size_t read_packets = 0;
1290        int numbytes = 0;
1291
1292        struct dag_per_stream_t *stream_data =
1293                (struct dag_per_stream_t *)t->format_data;
1294
1295        /* Read as many packets as we can, but read atleast one packet */
1296        do {
1297                ret = dag_read_packet_real(libtrace, stream_data, t,
1298                                           packets[read_packets]);
1299                if (ret < 0)
1300                        return ret;
1301
1302                read_packets++;
1303
1304                /* Make sure we don't read too many packets..! */
1305                if (read_packets >= nb_packets)
1306                        break;
1307
1308                numbytes = dag_available(libtrace, stream_data);
1309        } while (numbytes >= dag_record_size);
1310
1311        return read_packets;
1312}
1313
1314/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1315 * packet is available, we will return a packet event. Otherwise we will
1316 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1317 */
1318static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1319                                           libtrace_packet_t *packet)
1320{
1321        libtrace_eventobj_t event = {0,0,0.0,0};
1322        dag_record_t *erfptr = NULL;
1323        int numbytes;
1324        uint32_t flags = 0;
1325        struct timeval minwait, tv;
1326       
1327        minwait.tv_sec = 0;
1328        minwait.tv_usec = 10000;
1329
1330        /* Check if we're meant to provide a DUCK update */
1331        numbytes = dag_get_duckinfo(libtrace, packet);
1332        if (numbytes < 0) {
1333                event.type = TRACE_EVENT_TERMINATE;
1334                return event;
1335        } else if (numbytes > 0) {
1336                event.type = TRACE_EVENT_PACKET;
1337                return event;
1338        }
1339       
1340        if (dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd,
1341                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1342                                &minwait) == -1) {
1343                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1344                event.type = TRACE_EVENT_TERMINATE;
1345                return event;
1346        }
1347
1348        do {
1349                erfptr = NULL;
1350                numbytes = 0;
1351
1352                /* Need to call dag_available so that the top pointer will get
1353                 * updated, otherwise we'll never see any data! */
1354                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1355
1356                /* May as well not bother calling dag_get_record if
1357                 * dag_available suggests that there's no data */
1358                if (numbytes != 0)
1359                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1360                if (erfptr == NULL) {
1361                        /* No packet available - sleep for a very short time */
1362                        if (libtrace_halt) {
1363                                event.type = TRACE_EVENT_TERMINATE;
1364                        } else {
1365                                event.type = TRACE_EVENT_SLEEP;
1366                                event.seconds = 0.0001;
1367                        }
1368                        break;
1369                }
1370                if (dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
1371                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1372                        event.type = TRACE_EVENT_TERMINATE;
1373                        break;
1374                }
1375
1376
1377                event.size = trace_get_capture_length(packet) +
1378                        trace_get_framing_length(packet);
1379
1380                /* XXX trace_read_packet() normally applies the following
1381                 * config options for us, but this function is called via
1382                 * trace_event() so we have to do it ourselves */
1383
1384                if (libtrace->filter) {
1385                        int filtret = trace_apply_filter(libtrace->filter,
1386                                                         packet);
1387                        if (filtret == -1) {
1388                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1389                                              "Bad BPF Filter");
1390                                event.type = TRACE_EVENT_TERMINATE;
1391                                break;
1392                        }
1393
1394                        if (filtret == 0) {
1395                                /* This packet isn't useful so we want to
1396                                 * immediately see if there is another suitable
1397                                 * one - we definitely DO NOT want to return
1398                                 * a sleep event in this case, like we used to
1399                                 * do! */
1400                                libtrace->filtered_packets ++;
1401                                trace_clear_cache(packet);
1402                                continue;
1403                        }
1404
1405                        event.type = TRACE_EVENT_PACKET;
1406                } else {
1407                        event.type = TRACE_EVENT_PACKET;
1408                }
1409
1410                /* Update the DUCK timer */
1411                tv = trace_get_timeval(packet);
1412                DUCK.last_pkt = tv.tv_sec;
1413               
1414                if (libtrace->snaplen > 0) {
1415                        trace_set_capture_length(packet, libtrace->snaplen);
1416                }
1417                libtrace->accepted_packets ++;
1418                break;
1419        } while(1);
1420
1421        return event;
1422}
1423
1424/* Gets the number of dropped packets */
1425static uint64_t dag_get_dropped_packets(libtrace_t *libtrace)
1426{
1427        uint64_t sum = 0;
1428        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
1429
1430        /* Sum the drop counter for all the packets */
1431        while (tmp != NULL) {
1432                sum += STREAM_DATA(tmp)->drops;
1433                tmp = tmp->next;
1434        }
1435
1436        return sum;
1437}
1438
1439/* Prints some semi-useful help text about the DAG format module */
1440static void dag_help(void) {
1441        printf("dag format module: $Revision: 1755 $\n");
1442        printf("Supported input URIs:\n");
1443        printf("\tdag:/dev/dagn\n");
1444        printf("\n");
1445        printf("\te.g.: dag:/dev/dag0\n");
1446        printf("\n");
1447        printf("Supported output URIs:\n");
1448        printf("\tnone\n");
1449        printf("\n");
1450}
1451
1452static int dag_pconfig_input(UNUSED libtrace_t *libtrace,
1453                             trace_parallel_option_t option, UNUSED void *value)
1454{
1455        /* We don't support any of these! Normally you configure the DAG card
1456         * externally. */
1457        switch(option) {
1458        case TRACE_OPTION_SET_HASHER:
1459        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1460        case TRACE_OPTION_TRACETIME:
1461        case TRACE_OPTION_TICK_INTERVAL:
1462        case TRACE_OPTION_GET_CONFIG:
1463        case TRACE_OPTION_SET_CONFIG:
1464                return -1;
1465        }
1466        /* We don't provide a default option to ensure that future options will
1467         * generate a compiler warning. */
1468
1469        return -1;
1470}
1471
1472/* TODO: Should possibly make a more generic dag_start_input, as there's a
1473 * fair bit of code duplication between that and this */
1474static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1475                                bool reader)
1476{
1477        struct timeval zero, nopoll;
1478        uint8_t *top, *bottom, *starttop;
1479        struct dag_per_stream_t *stream_data;
1480        top = bottom = NULL;
1481
1482        /* Minimum delay is 10mS */
1483        zero.tv_sec = 0;
1484        zero.tv_usec = 10000;
1485        nopoll = zero;
1486
1487        if (reader) {
1488                if (t->type == THREAD_PERPKT) {
1489                        stream_data =
1490                                (struct dag_per_stream_t *)
1491                                libtrace_list_get_index(FORMAT_DATA->per_stream,
1492                                                        t->perpkt_num)->data;
1493
1494                        /* Pass the per thread data to the thread */
1495                        t->format_data = stream_data;
1496
1497                        /* Attach and start the DAG stream */
1498                        printf("t%u: starting and attaching stream #%u\n",
1499                               t->perpkt_num, stream_data->dagstream);
1500                        if (dag_attach_stream(stream_data->device->fd,
1501                                              stream_data->dagstream, 0,
1502                                              0) < 0) {
1503                                printf("can't attach DAG stream #%u\n",
1504                                       stream_data->dagstream);
1505                                trace_set_err(libtrace, errno,
1506                                              "can't attach DAG stream #%u",
1507                                              stream_data->dagstream);
1508                                return -1;
1509                        }
1510                        if (dag_start_stream(stream_data->device->fd,
1511                                             stream_data->dagstream) < 0) {
1512                                trace_set_err(libtrace, errno,
1513                                              "can't start DAG stream #%u",
1514                                              stream_data->dagstream);
1515                                printf("can't start DAG stream #%u\n",
1516                                       stream_data->dagstream);
1517                                return -1;
1518                        }
1519
1520                        /* Ensure that dag_advance_stream will return without
1521                         * blocking */
1522                        if(dag_set_stream_poll(stream_data->device->fd,
1523                                               stream_data->dagstream, 0, &zero,
1524                                               &nopoll) < 0) {
1525                                trace_set_err(libtrace, errno,
1526                                              "dag_set_stream_poll failed!");
1527                                return -1;
1528                        }
1529
1530                        /* Clear all the data from the memory hole */
1531                        starttop = dag_advance_stream(stream_data->
1532                                                      device->fd,
1533                                                      stream_data->dagstream,
1534                                                      &bottom);
1535
1536                        top = starttop;
1537                        while (starttop - bottom > 0) {
1538                                bottom += (starttop - bottom);
1539                                top = dag_advance_stream(stream_data->
1540                                                         device->fd,
1541                                                         stream_data->dagstream,
1542                                                         &bottom);
1543                        }
1544                        stream_data->top = top;
1545                        stream_data->bottom = bottom;
1546                        stream_data->pkt_count = 0;
1547                        stream_data->drops = 0;
1548                } else {
1549                        /* TODO: Figure out why t->type != THREAD_PERPKT in
1550                         * order to figure out what this line does */
1551                        t->format_data = FORMAT_DATA_FIRST;
1552                }
1553        }
1554
1555        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
1556
1557        return 0;
1558}
1559
1560static struct libtrace_format_t dag = {
1561        "dag",
1562        "$Id$",
1563        TRACE_FORMAT_ERF,
1564        dag_probe_filename,             /* probe filename */
1565        NULL,                           /* probe magic */
1566        dag_init_input,                 /* init_input */
1567        dag_config_input,               /* config_input */
1568        dag_start_input,                /* start_input */
1569        dag_pause_input,                /* pause_input */
1570        dag_init_output,                /* init_output */
1571        NULL,                           /* config_output */
1572        dag_start_output,               /* start_output */
1573        dag_fin_input,                  /* fin_input */
1574        dag_fin_output,                 /* fin_output */
1575        dag_read_packet,                /* read_packet */
1576        dag_prepare_packet,             /* prepare_packet */
1577        NULL,                           /* fin_packet */
1578        dag_write_packet,               /* write_packet */
1579        erf_get_link_type,              /* get_link_type */
1580        erf_get_direction,              /* get_direction */
1581        erf_set_direction,              /* set_direction */
1582        erf_get_erf_timestamp,          /* get_erf_timestamp */
1583        NULL,                           /* get_timeval */
1584        NULL,                           /* get_seconds */
1585        NULL,                           /* get_timespec */
1586        NULL,                           /* seek_erf */
1587        NULL,                           /* seek_timeval */
1588        NULL,                           /* seek_seconds */
1589        erf_get_capture_length,         /* get_capture_length */
1590        erf_get_wire_length,            /* get_wire_length */
1591        erf_get_framing_length,         /* get_framing_length */
1592        erf_set_capture_length,         /* set_capture_length */
1593        NULL,                           /* get_received_packets */
1594        NULL,                           /* get_filtered_packets */
1595        dag_get_dropped_packets,        /* get_dropped_packets */
1596        NULL,                           /* get_captured_packets */
1597        NULL,                           /* get_fd */
1598        trace_event_dag,                /* trace_event */
1599        dag_help,                       /* help */
1600        NULL,                            /* next pointer */
1601        {true, 0}, /* live packet capture, thread limit TBD */
1602        dag_pstart_input,
1603        dag_pread_packets,
1604        dag_pause_input,
1605        NULL,
1606        dag_pconfig_input,
1607        dag_pregister_thread,
1608        NULL
1609};
1610
1611void dag_constructor(void)
1612{
1613        register_format(&dag);
1614}
Note: See TracBrowser for help on using the repository browser.