source: lib/format_dag25.c @ 97d170d

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 97d170d was 97d170d, checked in by Dan Collins <dan@…>, 6 years ago

Merge branch 'develop' of github.com:wanduow/libtrace into develop

Conflicts:

lib/format_dag25.c

  • Property mode set to 100644
File size: 47.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84#define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
85
86#define FORMAT_DATA DATA(libtrace)
87#define FORMAT_DATA_OUT DATA_OUT(libtrace)
88
89#define DUCK FORMAT_DATA->duck
90static struct libtrace_format_t dag;
91
92/* A DAG device - a DAG device can support multiple streams (and therefore
93 * multiple input traces) so each trace needs to refer to a device */
94struct dag_dev_t {
95        char * dev_name;                /* Device name */
96        int fd;                         /* File descriptor */
97        uint16_t ref_count;             /* Number of input / output traces
98                                           that are using this device */
99        struct dag_dev_t *prev;         /* Pointer to the previous device in
100                                           the device list */
101        struct dag_dev_t *next;         /* Pointer to the next device in the
102                                           device list */
103};
104
105/* "Global" data that is stored for each DAG output trace */
106struct dag_format_data_out_t {
107        /* The DAG device being used for writing */
108        struct dag_dev_t *device;
109        /* The DAG stream that is being written on */
110        unsigned int dagstream;
111        /* Boolean flag indicating whether the stream is currently attached */
112        int stream_attached;
113        /* The amount of data waiting to be transmitted, in bytes */
114        uint64_t waiting;
115        /* A buffer to hold the data to be transmittted */
116        uint8_t *txbuffer;
117};
118
119/* Data that is stored for each libtrace_thread_t */
120struct dag_per_thread_t {
121        struct dag_dev_t *device; /* DAG device */
122        uint16_t stream; /* Stream number */
123        uint8_t *top; /* Pointer to the last unread byte in the DAG memory */
124        uint8_t *bottom; /* Pointer to the first unread byte in the DAG memory */
125        uint32_t processed; /* Amount of data processed from the bottom pointer */
126        uint64_t pkt_count; /* Number of packets seen by the thread */
127        uint64_t drops;
128};
129
130/* "Global" data that is stored for each DAG input trace */
131struct dag_format_data_t {
132
133        /* Data required for regular DUCK reporting */
134        struct {
135                /* Timestamp of the last DUCK report */
136                uint32_t last_duck;
137                /* The number of seconds between each DUCK report */
138                uint32_t duck_freq;
139                /* Timestamp of the last packet read from the DAG card */
140                uint32_t last_pkt;
141                /* Dummy trace to ensure DUCK packets are dealt with using the
142                 * DUCK format functions */
143                libtrace_t *dummy_duck;
144        } duck;
145
146        /* The DAG device that we are reading from */
147        struct dag_dev_t *device;
148        /* The DAG stream that we are reading from */
149        unsigned int dagstream;
150        /* Boolean flag indicating whether the stream is currently attached */
151        int stream_attached;
152        /* Pointer to the first unread byte in the DAG memory hole */
153        uint8_t *bottom;
154        /* Pointer to the last unread byte in the DAG memory hole */
155        uint8_t *top;
156        /* The amount of data processed thus far from the bottom pointer */
157        uint32_t processed;
158        /* The number of packets that have been dropped */
159        uint64_t drops;
160        /* When running in parallel mode this is malloc'd with an array of thread
161         * structures. Most of the stuff above doesn't get used in parallel mode. */
162        struct dag_per_thread_t *per_thread;
163
164        uint8_t seeninterface[4];
165};
166
167/* To be thread-safe, we're going to need a mutex for operating on the list
168 * of DAG devices */
169pthread_mutex_t open_dag_mutex;
170
171/* The list of DAG devices that have been opened by libtrace.
172 *
173 * We can only open each DAG device once, but we might want to read from
174 * multiple streams. Therefore, we need to maintain a list of devices that we
175 * have opened (with ref counts!) so that we don't try to open a device too
176 * many times or close a device that we're still using */
177struct dag_dev_t *open_dags = NULL;
178
179/* Returns the amount of padding between the ERF header and the start of the
180 * captured packet data */
181static int dag_get_padding(const libtrace_packet_t *packet)
182{
183        /* ERF Ethernet records have a 2 byte padding before the packet itself
184         * so that the IP header is aligned on a 32 bit boundary.
185         */
186        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
187                dag_record_t *erfptr = (dag_record_t *)packet->header;
188                switch(erfptr->type) {
189                        case TYPE_ETH:
190                        case TYPE_DSM_COLOR_ETH:
191                                return 2;
192                        default:                return 0;
193                }
194        }
195        else {
196                switch(trace_get_link_type(packet)) {
197                        case TRACE_TYPE_ETH:    return 2;
198                        default:                return 0;
199                }
200        }
201}
202
203/* Attempts to determine if the given filename refers to a DAG device */
204static int dag_probe_filename(const char *filename)
205{
206        struct stat statbuf;
207        /* Can we stat the file? */
208        if (stat(filename, &statbuf) != 0) {
209                return 0;
210        }
211        /* Is it a character device? */
212        if (!S_ISCHR(statbuf.st_mode)) {
213                return 0;
214        }
215        /* Yeah, it's probably us. */
216        return 1;
217}
218
219/* Initialises the DAG output data structure */
220static void dag_init_format_out_data(libtrace_out_t *libtrace) {
221        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
222        // no DUCK on output
223        FORMAT_DATA_OUT->stream_attached = 0;
224        FORMAT_DATA_OUT->device = NULL;
225        FORMAT_DATA_OUT->dagstream = 0;
226        FORMAT_DATA_OUT->waiting = 0;
227
228}
229
230/* Initialises the DAG input data structure */
231static void dag_init_format_data(libtrace_t *libtrace) {
232        libtrace->format_data = (struct dag_format_data_t *)
233                malloc(sizeof(struct dag_format_data_t));
234        DUCK.last_duck = 0;
235        DUCK.duck_freq = 0;
236        DUCK.last_pkt = 0;
237        DUCK.dummy_duck = NULL;
238        FORMAT_DATA->stream_attached = 0;
239        FORMAT_DATA->drops = 0;
240        FORMAT_DATA->device = NULL;
241        FORMAT_DATA->dagstream = 0;
242        FORMAT_DATA->processed = 0;
243        FORMAT_DATA->bottom = NULL;
244        FORMAT_DATA->top = NULL;
245        memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
246}
247
248/* Determines if there is already an entry for the given DAG device in the
249 * device list and increments the reference count for that device, if found.
250 *
251 * NOTE: This function assumes the open_dag_mutex is held by the caller */
252static struct dag_dev_t *dag_find_open_device(char *dev_name) {
253        struct dag_dev_t *dag_dev;
254
255        dag_dev = open_dags;
256
257        /* XXX: Not exactly zippy, but how often are we going to be dealing
258         * with multiple dag cards? */
259        while (dag_dev != NULL) {
260                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
261                        dag_dev->ref_count ++;
262                        return dag_dev;
263
264                }
265                dag_dev = dag_dev->next;
266        }
267        return NULL;
268
269
270}
271
272/* Closes a DAG device and removes it from the device list.
273 *
274 * Attempting to close a DAG device that has a non-zero reference count will
275 * cause an assertion failure!
276 *
277 * NOTE: This function assumes the open_dag_mutex is held by the caller */
278static void dag_close_device(struct dag_dev_t *dev) {
279        /* Need to remove from the device list */
280
281        assert(dev->ref_count == 0);
282
283        if (dev->prev == NULL) {
284                open_dags = dev->next;
285                if (dev->next)
286                        dev->next->prev = NULL;
287        } else {
288                dev->prev->next = dev->next;
289                if (dev->next)
290                        dev->next->prev = dev->prev;
291        }
292
293        dag_close(dev->fd);
294        if (dev->dev_name)
295                free(dev->dev_name);
296        free(dev);
297}
298
299
300/* Opens a new DAG device for writing and adds it to the DAG device list
301 *
302 * NOTE: this function should only be called when opening a DAG device for
303 * writing - there is little practical difference between this and the
304 * function below that covers the reading case, but we need the output trace
305 * object to report errors properly so the two functions take slightly
306 * different arguments. This is really lame and there should be a much better
307 * way of doing this.
308 *
309 * NOTE: This function assumes the open_dag_mutex is held by the caller
310 */
311static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
312        struct stat buf;
313        int fd;
314        struct dag_dev_t *new_dev;
315
316        /* Make sure the device exists */
317        if (stat(dev_name, &buf) == -1) {
318                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
319                return NULL;
320}
321
322        /* Make sure it is the appropriate type of device */
323        if (S_ISCHR(buf.st_mode)) {
324                /* Try opening the DAG device */
325                if((fd = dag_open(dev_name)) < 0) {
326                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
327                                        dev_name);
328                        return NULL;
329                }
330        } else {
331                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
332                                dev_name);
333                return NULL;
334        }
335
336        /* Add the device to our device list - it is just a doubly linked
337         * list with no inherent ordering; just tack the new one on the front
338         */
339        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
340        new_dev->fd = fd;
341        new_dev->dev_name = dev_name;
342        new_dev->ref_count = 1;
343
344        new_dev->prev = NULL;
345        new_dev->next = open_dags;
346        if (open_dags)
347                open_dags->prev = new_dev;
348
349        open_dags = new_dev;
350
351        return new_dev;
352}
353
354/* Opens a new DAG device for reading and adds it to the DAG device list
355 *
356 * NOTE: this function should only be called when opening a DAG device for
357 * reading - there is little practical difference between this and the
358 * function above that covers the writing case, but we need the input trace
359 * object to report errors properly so the two functions take slightly
360 * different arguments. This is really lame and there should be a much better
361 * way of doing this.
362 *
363 * NOTE: This function assumes the open_dag_mutex is held by the caller */
364static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
365        struct stat buf;
366        int fd;
367        struct dag_dev_t *new_dev;
368
369        /* Make sure the device exists */
370        if (stat(dev_name, &buf) == -1) {
371                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
372                return NULL;
373        }
374
375        /* Make sure it is the appropriate type of device */
376        if (S_ISCHR(buf.st_mode)) {
377                /* Try opening the DAG device */
378                if((fd = dag_open(dev_name)) < 0) {
379                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
380                                        dev_name);
381                        return NULL;
382                }
383        } else {
384                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
385                                dev_name);
386                return NULL;
387        }
388
389        /* Add the device to our device list - it is just a doubly linked
390         * list with no inherent ordering; just tack the new one on the front
391         */
392        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
393        new_dev->fd = fd;
394        new_dev->dev_name = dev_name;
395        new_dev->ref_count = 1;
396
397        new_dev->prev = NULL;
398        new_dev->next = open_dags;
399        if (open_dags)
400                open_dags->prev = new_dev;
401
402        open_dags = new_dev;
403
404        return new_dev;
405}
406
407/* Creates and initialises a DAG output trace */
408static int dag_init_output(libtrace_out_t *libtrace) {
409        char *dag_dev_name = NULL;
410        char *scan = NULL;
411        struct dag_dev_t *dag_device = NULL;
412        int stream = 1;
413       
414        /* XXX I don't know if this is important or not, but this function
415         * isn't present in all of the driver releases that this code is
416         * supposed to support! */
417        /*
418        unsigned long wake_time;
419        dagutil_sleep_get_wake_time(&wake_time,0);
420        */
421
422        dag_init_format_out_data(libtrace);
423        /* Grab the mutex while we're likely to be messing with the device
424         * list */
425        pthread_mutex_lock(&open_dag_mutex);
426       
427        /* Specific streams are signified using a comma in the libtrace URI,
428         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
429         *
430         * If no stream is specified, we will write using stream 1 */
431        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
432                dag_dev_name = strdup(libtrace->uridata);
433        } else {
434                dag_dev_name = (char *)strndup(libtrace->uridata,
435                                (size_t)(scan - libtrace->uridata));
436                stream = atoi(++scan);
437        }
438        FORMAT_DATA_OUT->dagstream = stream;
439
440        /* See if our DAG device is already open */
441        dag_device = dag_find_open_device(dag_dev_name);
442
443        if (dag_device == NULL) {
444                /* Device not yet opened - open it ourselves */
445                dag_device = dag_open_output_device(libtrace, dag_dev_name);
446        } else {
447                /* Otherwise, just use the existing one */
448                free(dag_dev_name);
449                dag_dev_name = NULL;
450        }
451
452        /* Make sure we have successfully opened a DAG device */
453        if (dag_device == NULL) {
454                if (dag_dev_name) {
455                        free(dag_dev_name);
456                }
457                pthread_mutex_unlock(&open_dag_mutex);
458                return -1;
459        }
460
461        FORMAT_DATA_OUT->device = dag_device;
462        pthread_mutex_unlock(&open_dag_mutex);
463        return 0;
464}
465
466/* Creates and initialises a DAG input trace */
467static int dag_init_input(libtrace_t *libtrace) {
468        char *dag_dev_name = NULL;
469        char *scan = NULL;
470        int stream = 0, thread_count = 1;
471        struct dag_dev_t *dag_device = NULL;
472
473        dag_init_format_data(libtrace);
474        /* Grab the mutex while we're likely to be messing with the device
475         * list */
476        pthread_mutex_lock(&open_dag_mutex);
477
478
479        /* DAG cards support multiple streams. In a single threaded capture,
480         * these are specified using a comma in the libtrace URI,
481         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
482         *
483         * If no stream is specified, we will read from stream 0 with one thread
484         */
485        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
486                dag_dev_name = strdup(libtrace->uridata);
487               
488        } else {
489                dag_dev_name = (char *)strndup(libtrace->uridata,
490                                                                           (size_t)(scan - libtrace->uridata));
491                stream = atoi(++scan);
492        }
493
494        FORMAT_DATA->dagstream = stream;
495
496        /* See if our DAG device is already open */
497        dag_device = dag_find_open_device(dag_dev_name);
498
499        if (dag_device == NULL) {
500                /* Device not yet opened - open it ourselves */
501                dag_device = dag_open_device(libtrace, dag_dev_name);
502        } else {
503                /* Otherwise, just use the existing one */
504                free(dag_dev_name);
505                dag_dev_name = NULL;
506        }
507
508        /* Make sure we have successfully opened a DAG device */
509        if (dag_device == NULL) {
510                if (dag_dev_name)
511                        free(dag_dev_name);
512                dag_dev_name = NULL;
513                pthread_mutex_unlock(&open_dag_mutex);
514                return -1;
515        }
516
517        FORMAT_DATA->device = dag_device;
518
519        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
520        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
521 
522        *  The symptom of the port being disabled is that libtrace will appear to hang.
523        */
524        /* Check kBooleanAttributeFault is false */
525        /* Check kBooleanAttributeLocalFault is false */
526        /* Check kBooleanAttributeLock is true ? */
527        /* Check kBooleanAttributePeerLink ? */
528
529        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
530        /* Set kUint32AttributeSnapLength to the snaplength */
531
532        pthread_mutex_unlock(&open_dag_mutex);
533        return 0;
534}
535
536/* Configures a DAG input trace */
537static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
538                                void *data) {
539        char conf_str[4096];
540        switch(option) {
541                case TRACE_OPTION_META_FREQ:
542                        /* This option is used to specify the frequency of DUCK
543                         * updates */
544                        DUCK.duck_freq = *(int *)data;
545                        return 0;
546                case TRACE_OPTION_SNAPLEN:
547                        /* Tell the card our new snap length */
548                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
549                        if (dag_configure(FORMAT_DATA->device->fd,
550                                                conf_str) != 0) {
551                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
552                                return -1;
553                        }
554                        return 0;
555                case TRACE_OPTION_PROMISC:
556                        /* DAG already operates in a promisc fashion */
557                        return -1;
558                case TRACE_OPTION_FILTER:
559                        /* We don't yet support pushing filters into DAG
560                         * cards */
561                        return -1;
562                case TRACE_OPTION_EVENT_REALTIME:
563                        /* Live capture is always going to be realtime */
564                        return -1;
565        }
566        return -1;
567}
568
569/* Starts a DAG output trace */
570static int dag_start_output(libtrace_out_t *libtrace) {
571        struct timeval zero, nopoll;
572
573        zero.tv_sec = 0;
574        zero.tv_usec = 0;
575        nopoll = zero;
576
577        /* Attach and start the DAG stream */
578
579        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
580                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
581                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
582                return -1;
583        }
584
585        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
586                        FORMAT_DATA_OUT->dagstream) < 0) {
587                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
588                return -1;
589        }
590        FORMAT_DATA_OUT->stream_attached = 1;
591
592        /* We don't want the dag card to do any sleeping */
593
594        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
595                        FORMAT_DATA_OUT->dagstream, 0, &zero,
596                        &nopoll);
597
598        return 0;
599}
600
601/* Starts a DAG input trace */
602static int dag_start_input(libtrace_t *libtrace) {
603        struct timeval zero, nopoll;
604        uint8_t *top, *bottom, *starttop;
605        uint64_t diff = 0;
606        top = bottom = NULL;
607
608        zero.tv_sec = 0;
609        zero.tv_usec = 10000;
610        nopoll = zero;
611
612        /* Attach and start the DAG stream */
613        if (dag_attach_stream(FORMAT_DATA->device->fd,
614                                FORMAT_DATA->dagstream, 0, 0) < 0) {
615                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
616                return -1;
617        }
618
619        if (dag_start_stream(FORMAT_DATA->device->fd,
620                                FORMAT_DATA->dagstream) < 0) {
621                trace_set_err(libtrace, errno, "Cannot start DAG stream");
622                return -1;
623        }
624        FORMAT_DATA->stream_attached = 1;
625       
626        /* We don't want the dag card to do any sleeping */
627        dag_set_stream_poll(FORMAT_DATA->device->fd,
628                                FORMAT_DATA->dagstream, 0, &zero,
629                                &nopoll);
630
631        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
632                                        FORMAT_DATA->dagstream,
633                                        &bottom);
634
635        /* Should probably flush the memory hole now */
636        while (starttop - bottom > 0) {
637                bottom += (starttop - bottom);
638                top = dag_advance_stream(FORMAT_DATA->device->fd,
639                                        FORMAT_DATA->dagstream,
640                                        &bottom);
641        }
642        FORMAT_DATA->top = top;
643        FORMAT_DATA->bottom = bottom;
644        FORMAT_DATA->processed = 0;
645        FORMAT_DATA->drops = 0;
646
647        return 0;
648}
649
650/* Pauses a DAG output trace */
651static int dag_pause_output(libtrace_out_t *libtrace) {
652
653        /* Stop and detach the stream */
654        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
655                        FORMAT_DATA_OUT->dagstream) < 0) {
656                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
657                return -1;
658        }
659        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
660                        FORMAT_DATA_OUT->dagstream) < 0) {
661                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
662                return -1;
663        }
664        FORMAT_DATA_OUT->stream_attached = 0;
665        return 0;
666}
667
668/* Pauses a DAG input trace */
669static int dag_pause_input(libtrace_t *libtrace) {
670
671        /* Stop and detach the stream */
672        if (dag_stop_stream(FORMAT_DATA->device->fd,
673                                FORMAT_DATA->dagstream) < 0) {
674                trace_set_err(libtrace, errno, "Could not stop DAG stream");
675                return -1;
676        }
677        if (dag_detach_stream(FORMAT_DATA->device->fd,
678                                FORMAT_DATA->dagstream) < 0) {
679                trace_set_err(libtrace, errno, "Could not detach DAG stream");
680                return -1;
681        }
682        FORMAT_DATA->stream_attached = 0;
683        return 0;
684}
685
686/* Closes a DAG input trace */
687static int dag_fin_input(libtrace_t *libtrace) {
688        /* Need the lock, since we're going to be handling the device list */
689        pthread_mutex_lock(&open_dag_mutex);
690       
691        /* Detach the stream if we are not paused */
692        if (FORMAT_DATA->stream_attached)
693                dag_pause_input(libtrace);
694        FORMAT_DATA->device->ref_count --;
695
696        /* Close the DAG device if there are no more references to it */
697        if (FORMAT_DATA->device->ref_count == 0)
698                dag_close_device(FORMAT_DATA->device);
699        if (DUCK.dummy_duck)
700                trace_destroy_dead(DUCK.dummy_duck);
701        free(libtrace->format_data);
702        pthread_mutex_unlock(&open_dag_mutex);
703        return 0; /* success */
704}
705
706/* Closes a DAG output trace */
707static int dag_fin_output(libtrace_out_t *libtrace) {
708       
709        /* Commit any outstanding traffic in the txbuffer */
710        if (FORMAT_DATA_OUT->waiting) {
711                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
712                                FORMAT_DATA_OUT->waiting );
713        }
714
715        /* Wait until the buffer is nearly clear before exiting the program,
716         * as we will lose packets otherwise */
717        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
718                        FORMAT_DATA_OUT->dagstream,
719                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
720                                        FORMAT_DATA_OUT->dagstream) - 8
721                        );
722
723        /* Need the lock, since we're going to be handling the device list */
724        pthread_mutex_lock(&open_dag_mutex);
725
726        /* Detach the stream if we are not paused */
727        if (FORMAT_DATA_OUT->stream_attached)
728                dag_pause_output(libtrace);
729        FORMAT_DATA_OUT->device->ref_count --;
730
731        /* Close the DAG device if there are no more references to it */
732        if (FORMAT_DATA_OUT->device->ref_count == 0)
733                dag_close_device(FORMAT_DATA_OUT->device);
734        free(libtrace->format_data);
735        pthread_mutex_unlock(&open_dag_mutex);
736        return 0; /* success */
737}
738
739/* Extracts DUCK information from the DAG card and produces a DUCK packet */
740static int dag_get_duckinfo(libtrace_t *libtrace,
741                                libtrace_packet_t *packet) {
742
743        /* Allocate memory for the DUCK data */
744        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
745                        !packet->buffer) {
746                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
747                packet->buf_control = TRACE_CTRL_PACKET;
748                if (!packet->buffer) {
749                        trace_set_err(libtrace, errno,
750                                        "Cannot allocate packet buffer");
751                        return -1;
752                }
753        }
754
755        /* DUCK doesn't have a format header */
756        packet->header = 0;
757        packet->payload = packet->buffer;
758
759        /* No need to check if we can get DUCK or not - we're modern
760         * enough so just grab the DUCK info */
761        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
762                                        (duckinf_t *)packet->payload) < 0)) {
763                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
764                return -1;
765        }
766
767        packet->type = TRACE_RT_DUCK_2_5;
768
769        /* Set the packet's tracce to point at a DUCK trace, so that the
770         * DUCK format functions will be called on the packet rather than the
771         * DAG ones */
772        if (!DUCK.dummy_duck)
773                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
774        packet->trace = DUCK.dummy_duck;
775        return sizeof(duckinf_t);
776}
777
778/* Determines the amount of data available to read from the DAG card */
779static int dag_available(libtrace_t *libtrace) {
780        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
781
782        /* If we've processed more than 4MB of data since we last called
783         * dag_advance_stream, then we should call it again to allow the
784         * space occupied by that 4MB to be released */
785        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
786                return diff;
787       
788        /* Update the top and bottom pointers */
789        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
790                        FORMAT_DATA->dagstream,
791                        &(FORMAT_DATA->bottom));
792       
793        if (FORMAT_DATA->top == NULL) {
794                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
795                return -1;
796        }
797        FORMAT_DATA->processed = 0;
798        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
799        return diff;
800}
801
802/* Returns a pointer to the start of the next complete ERF record */
803static dag_record_t *dag_get_record(libtrace_t *libtrace) {
804        dag_record_t *erfptr = NULL;
805        uint16_t size;
806        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
807        if (!erfptr)
808                return NULL;
809        size = ntohs(erfptr->rlen);
810        assert( size >= dag_record_size );
811        /* Make certain we have the full packet available */
812        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
813                return NULL;
814        FORMAT_DATA->bottom += size;
815        FORMAT_DATA->processed += size;
816        return erfptr;
817}
818
819/* Converts a buffer containing a recently read DAG packet record into a
820 * libtrace packet */
821static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
822                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
823
824        dag_record_t *erfptr;
825        libtrace_thread_t *t;
826       
827        /* If the packet previously owned a buffer that is not the buffer
828         * that contains the new packet data, we're going to need to free the
829         * old one to avoid memory leaks */
830        if (packet->buffer != buffer &&
831                        packet->buf_control == TRACE_CTRL_PACKET) {
832                free(packet->buffer);
833        }
834
835        /* Set the buffer owner appropriately */
836        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
837                packet->buf_control = TRACE_CTRL_PACKET;
838        } else
839                packet->buf_control = TRACE_CTRL_EXTERNAL;
840
841        /* Update the packet pointers and type appropriately */
842        erfptr = (dag_record_t *)buffer;
843        packet->buffer = erfptr;
844        packet->header = erfptr;
845        packet->type = rt_type;
846
847        if (erfptr->flags.rxerror == 1) {
848                /* rxerror means the payload is corrupt - drop the payload
849                 * by tweaking rlen */
850                packet->payload = NULL;
851                erfptr->rlen = htons(erf_get_framing_length(packet));
852        } else {
853                packet->payload = (char*)packet->buffer
854                        + erf_get_framing_length(packet);
855        }
856
857        if (libtrace->format_data == NULL) {
858                dag_init_format_data(libtrace);
859        }
860
861        /* Update the dropped packets counter */
862        /* No loss counter for DSM coloured records - have to use
863         * some other API */
864        /* Adding multithread support for this isn't actually that useful for the
865         * DAG7.5G2, as there's no way to use multiple receive streams without DSM */
866        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
867                /* TODO */
868        } else {
869                /* Use the ERF loss counter */
870                if (DATA(libtrace)->per_thread) {
871                        t = get_thread_table(libtrace);
872                        PERPKT_DATA(t)->drops += ntohs(erfptr->lctr);
873                } else {
874                        if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
875                                FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
876                        } else {
877                                FORMAT_DATA->drops += ntohs(erfptr->lctr);
878                        }
879                }
880        }
881
882
883        return 0;
884}
885
886/*
887 * dag_write_packet() at this stage attempts to improve tx performance
888 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
889 * has been met. I observed approximately 270% performance increase
890 * through this relatively naive tweak. No optimisation of buffer sizes
891 * was attempted.
892 */
893
894/* Pushes an ERF record onto the transmit stream */
895static int dag_dump_packet(libtrace_out_t *libtrace,
896                dag_record_t *erfptr, unsigned int pad, void *buffer) {
897        int size;
898
899        /*
900         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
901         * requested any space yet, and request some, storing the pointer at
902         * FORMAT_DATA_OUT->txbuffer.
903         *
904         * The amount to request is slightly magical at the moment - it's
905         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
906         * the buffer and handle overruns.
907         */
908        if (FORMAT_DATA_OUT->waiting == 0) {
909                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
910                                FORMAT_DATA_OUT->dagstream, 16908288);
911        }
912
913        /*
914         * Copy the header separately to the body, as we can't guarantee they
915         * are in contiguous memory
916         */
917        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
918        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
919
920
921
922        /*
923         * Copy our incoming packet into the outgoing buffer, and increment
924         * our waiting count
925         */
926        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
927        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
928        FORMAT_DATA_OUT->waiting += size;
929
930        /*
931         * If our output buffer has more than 16 Mebibytes in it, commit those
932         * bytes and reset the waiting count to 0.
933         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
934         * case there is still data in the buffer at program exit.
935         */
936
937        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
938                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
939                        FORMAT_DATA_OUT->waiting );
940                FORMAT_DATA_OUT->waiting = 0;
941        }
942
943        return size + pad + dag_record_size;
944
945}
946
947/* Attempts to determine a suitable ERF type for a given packet. Returns true
948 * if one is found, false otherwise */
949static bool find_compatible_linktype(libtrace_out_t *libtrace,
950                                libtrace_packet_t *packet, char *type)
951{
952         // Keep trying to simplify the packet until we can find
953         //something we can do with it
954
955        do {
956                *type=libtrace_to_erf_type(trace_get_link_type(packet));
957
958                // Success
959                if (*type != (char)-1)
960                        return true;
961
962                if (!demote_packet(packet)) {
963                        trace_set_err_out(libtrace,
964                                        TRACE_ERR_NO_CONVERSION,
965                                        "No erf type for packet (%i)",
966                                        trace_get_link_type(packet));
967                        return false;
968                }
969
970        } while(1);
971
972        return true;
973}
974
975/* Writes a packet to the provided DAG output trace */
976static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
977        /*
978         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
979         * sucks, sorry about that.
980         */
981        unsigned int pad = 0;
982        int numbytes;
983        void *payload = packet->payload;
984        dag_record_t *header = (dag_record_t *)packet->header;
985        char erf_type = 0;
986
987        if(!packet->header) {
988                /* No header, probably an RT packet. Lifted from
989                 * erf_write_packet(). */
990                return -1;
991        }
992
993        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
994                return 0;
995
996        pad = dag_get_padding(packet);
997
998        /*
999         * If the payload is null, adjust the rlen. Discussion of this is
1000         * attached to erf_write_packet()
1001         */
1002        if (payload == NULL) {
1003                header->rlen = htons(dag_record_size + pad);
1004        }
1005
1006        if (packet->type == TRACE_RT_DATA_ERF) {
1007                numbytes = dag_dump_packet(libtrace,
1008                                header,
1009                                pad,
1010                                payload
1011                                );
1012
1013        } else {
1014                /* Build up a new packet header from the existing header */
1015
1016                /* Simplify the packet first - if we can't do this, break
1017                 * early */
1018                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1019                        return -1;
1020
1021                dag_record_t erfhdr;
1022
1023                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1024                payload=packet->payload;
1025                pad = dag_get_padding(packet);
1026
1027                /* Flags. Can't do this */
1028                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1029                if (trace_get_direction(packet)!=(int)~0U)
1030                        erfhdr.flags.iface = trace_get_direction(packet);
1031
1032                erfhdr.type = erf_type;
1033
1034                /* Packet length (rlen includes format overhead) */
1035                assert(trace_get_capture_length(packet)>0
1036                                && trace_get_capture_length(packet)<=65536);
1037                assert(erf_get_framing_length(packet)>0
1038                                && trace_get_framing_length(packet)<=65536);
1039                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1040                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1041
1042                erfhdr.rlen = htons(trace_get_capture_length(packet)
1043                        + erf_get_framing_length(packet));
1044
1045
1046                /* Loss counter. Can't do this */
1047                erfhdr.lctr = 0;
1048                /* Wire length, does not include padding! */
1049                erfhdr.wlen = htons(trace_get_wire_length(packet));
1050
1051                /* Write it out */
1052                numbytes = dag_dump_packet(libtrace,
1053                                &erfhdr,
1054                                pad,
1055                                payload);
1056
1057        }
1058
1059        return numbytes;
1060}
1061
1062/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1063 *
1064 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1065 */
1066static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1067        int size = 0;
1068        struct timeval tv;
1069        dag_record_t *erfptr = NULL;
1070        int numbytes = 0;
1071        uint32_t flags = 0;
1072        struct timeval maxwait;
1073        struct timeval pollwait;
1074
1075        pollwait.tv_sec = 0;
1076        pollwait.tv_usec = 10000;
1077        maxwait.tv_sec = 0;
1078        maxwait.tv_usec = 250000;
1079
1080        /* Check if we're due for a DUCK report */
1081        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
1082                        DUCK.duck_freq != 0) {
1083                size = dag_get_duckinfo(libtrace, packet);
1084                DUCK.last_duck = DUCK.last_pkt;
1085                if (size != 0) {
1086                        return size;
1087                }
1088                /* No DUCK support, so don't waste our time anymore */
1089                DUCK.duck_freq = 0;
1090        }
1091
1092        /* Don't let anyone try to free our DAG memory hole! */
1093        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1094
1095        /* If the packet buffer is currently owned by libtrace, free it so
1096         * that we can set the packet to point into the DAG memory hole */
1097        if (packet->buf_control == TRACE_CTRL_PACKET) {
1098                free(packet->buffer);
1099                packet->buffer = 0;
1100        }
1101       
1102        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1103                        FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 
1104                        &pollwait) == -1)
1105        {
1106                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1107                return -1;
1108        }
1109
1110
1111        /* Grab a full ERF record */
1112        do {
1113                numbytes = dag_available(libtrace);
1114                if (numbytes < 0)
1115                        return numbytes;
1116                if (numbytes < dag_record_size) {
1117                        if (libtrace_halt)
1118                                return 0;
1119                        /* Block until we see a packet */
1120                        continue;
1121                }
1122                erfptr = dag_get_record(libtrace);
1123        } while (erfptr == NULL);
1124
1125        /* Prepare the libtrace packet */
1126        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1127                                flags))
1128                return -1;
1129
1130        /* Update the DUCK timer */
1131        tv = trace_get_timeval(packet);
1132        DUCK.last_pkt = tv.tv_sec;
1133
1134        return packet->payload ? htons(erfptr->rlen) :
1135                                erf_get_framing_length(packet);
1136}
1137
1138/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1139 * packet is available, we will return a packet event. Otherwise we will
1140 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1141 */
1142static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1143                                        libtrace_packet_t *packet) {
1144        libtrace_eventobj_t event = {0,0,0.0,0};
1145        dag_record_t *erfptr = NULL;
1146        int numbytes;
1147        uint32_t flags = 0;
1148        struct timeval minwait;
1149       
1150        minwait.tv_sec = 0;
1151        minwait.tv_usec = 10000;
1152       
1153        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1154                        FORMAT_DATA->dagstream, 0, &minwait, 
1155                        &minwait) == -1)
1156        {
1157                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1158                event.type = TRACE_EVENT_TERMINATE;
1159                return event;
1160        }
1161
1162        do {
1163                erfptr = NULL;
1164                numbytes = 0;
1165       
1166                /* Need to call dag_available so that the top pointer will get
1167                 * updated, otherwise we'll never see any data! */
1168                numbytes = dag_available(libtrace);
1169
1170                /* May as well not bother calling dag_get_record if
1171                 * dag_available suggests that there's no data */
1172                if (numbytes != 0)
1173                        erfptr = dag_get_record(libtrace);
1174                if (erfptr == NULL) {
1175                        /* No packet available - sleep for a very short time */
1176                        if (libtrace_halt) {
1177                                event.type = TRACE_EVENT_TERMINATE;
1178                        } else {                       
1179                                event.type = TRACE_EVENT_SLEEP;
1180                                event.seconds = 0.0001;
1181                        }
1182                        break;
1183                }
1184                if (dag_prepare_packet(libtrace, packet, erfptr, 
1185                                        TRACE_RT_DATA_ERF, flags)) {
1186                        event.type = TRACE_EVENT_TERMINATE;
1187                        break;
1188                }
1189
1190
1191                event.size = trace_get_capture_length(packet) + 
1192                                trace_get_framing_length(packet);
1193               
1194                /* XXX trace_read_packet() normally applies the following
1195                 * config options for us, but this function is called via
1196                 * trace_event() so we have to do it ourselves */
1197
1198                if (libtrace->filter) {
1199                        int filtret = trace_apply_filter(libtrace->filter, 
1200                                        packet);
1201                        if (filtret == -1) {
1202                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1203                                                "Bad BPF Filter");
1204                                event.type = TRACE_EVENT_TERMINATE;
1205                                break;
1206                        }
1207
1208                        if (filtret == 0) {
1209                                /* This packet isn't useful so we want to
1210                                 * immediately see if there is another suitable
1211                                 * one - we definitely DO NOT want to return
1212                                 * a sleep event in this case, like we used to
1213                                 * do! */
1214                                libtrace->filtered_packets ++;
1215                                trace_clear_cache(packet);
1216                                continue;
1217                        }
1218                               
1219                        event.type = TRACE_EVENT_PACKET;
1220                } else {
1221                        event.type = TRACE_EVENT_PACKET;
1222                }
1223
1224                if (libtrace->snaplen > 0) {
1225                        trace_set_capture_length(packet, libtrace->snaplen);
1226                }
1227                libtrace->accepted_packets ++;
1228                break;
1229        } while (1);
1230
1231        return event;
1232}
1233
1234/* Gets the number of dropped packets */
1235static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1236        uint64_t sum = 0;
1237        int i, tot;
1238
1239        if (trace->format_data == NULL)
1240                return (uint64_t)-1;
1241
1242        if (DATA(trace)->per_thread) {
1243                tot = trace->perpkt_thread_count;
1244
1245                for (i = 0; i < tot; i++) {
1246                        printf("t%d: drops %" PRIu64 "\n",
1247                                   DATA(trace)->per_thread[i].drops);
1248                        sum += DATA(trace)->per_thread[i].drops;
1249                }
1250        }
1251
1252        sum += DATA(trace)->drops;
1253
1254        return sum;
1255}
1256
1257/* Prints some semi-useful help text about the DAG format module */
1258static void dag_help(void) {
1259        printf("dag format module: $Revision: 1755 $\n");
1260        printf("Supported input URIs:\n");
1261        printf("\tdag:/dev/dagn\n");
1262        printf("\n");
1263        printf("\te.g.: dag:/dev/dag0\n");
1264        printf("\n");
1265        printf("Supported output URIs:\n");
1266        printf("\tnone\n");
1267        printf("\n");
1268}
1269
1270static int dag_pstart_input(libtrace_t *libtrace) {
1271        char *scan, *tok;
1272        uint16_t stream_count = 0, max_streams;
1273        /* We keep our own pointer to per_thread as the system will free
1274         * up FORMAT_DATA without freeing this if something goes wrong */
1275        struct dag_per_thread_t *per_thread = NULL;
1276        int iserror = 0;
1277
1278        /* Check we aren't trying to create more threads than the DAG card can
1279         * handle */
1280        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
1281        if (libtrace->perpkt_thread_count > max_streams) {
1282                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "trying to create too "
1283                                          "many threads (max is %u)", max_streams);
1284                iserror = 1;
1285                goto cleanup;
1286        }
1287
1288        /* Create the thread structures */
1289        per_thread = calloc(libtrace->perpkt_thread_count,
1290                                                                         sizeof(struct dag_per_thread_t));
1291        FORMAT_DATA->per_thread = per_thread;
1292
1293        /* Get the stream names from the uri */
1294        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
1295                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri doesn't "
1296                                          "specify the DAG streams");
1297                iserror = 1;
1298                goto cleanup;
1299        }
1300
1301        scan++;
1302 
1303        tok = strtok(scan, ",");
1304        while (tok != NULL) {
1305                /* Ensure we haven't specified too many streams */
1306                if (stream_count >= libtrace->perpkt_thread_count) {
1307                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri specifies too "
1308                                          "many streams. Max is %u", max_streams);
1309                        iserror = 1;
1310                        goto cleanup;
1311                }
1312
1313                /* Save the stream details */
1314                per_thread[stream_count].device = FORMAT_DATA->device;
1315                per_thread[stream_count++].stream = (uint16_t)atoi(tok);
1316
1317                tok = strtok(NULL, ",");
1318        }
1319
1320 cleanup:
1321        if (iserror) {
1322                /* Free the per_thread memory */
1323                free(per_thread);
1324               
1325                return -1;
1326        } else {
1327                return 0;
1328        }
1329}
1330
1331
1332
1333/* TODO: Fold this into dag_available */
1334static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t) {
1335        uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1336
1337        /* If we've processed more than 4MB of data since we last called
1338         * dag_advance_stream, then we should call it again to allow the
1339         * space occupied by that 4MB to be released */
1340        if (diff >= dag_record_size && PERPKT_DATA(t)->processed < 4 * 1024 * 1024)
1341                return diff;
1342
1343        /* Update top and bottom pointers */
1344        PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
1345                                                                                         PERPKT_DATA(t)->stream,
1346                                                                                         &(PERPKT_DATA(t)->bottom));
1347
1348        if (PERPKT_DATA(t)->top == NULL) {
1349                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
1350                return -1;
1351        }
1352
1353        PERPKT_DATA(t)->processed = 0;
1354        diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1355        return diff;
1356}
1357
1358/* TODO: Fold this into dag_get_record */
1359static dag_record_t *dag_pget_record(libtrace_t *libtrace,
1360                                                                         libtrace_thread_t *t) {
1361        dag_record_t *erfptr = NULL;
1362        uint16_t size;
1363
1364        erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
1365        if (!erfptr)
1366                return NULL;
1367
1368        /* Ensure we have a whole record */
1369        size = ntohs(erfptr->rlen);
1370        assert(size >= dag_record_size);
1371        if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
1372                return NULL;
1373
1374        /* Advance the buffer pointers */
1375        PERPKT_DATA(t)->bottom += size;
1376        PERPKT_DATA(t)->processed += size;
1377       
1378        return erfptr;
1379}
1380
1381static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
1382                                                        libtrace_packet_t *packet) {
1383        dag_record_t *erfptr = NULL;
1384        int numbytes = 0;
1385        uint32_t flags = 0;
1386        struct timeval maxwait, pollwait;
1387
1388        pollwait.tv_sec = 0;
1389        pollwait.tv_usec = 10000;
1390        maxwait.tv_sec = 0;
1391        maxwait.tv_usec = 250000;
1392
1393        /* TODO: Support DUCK reporting */
1394
1395        /* Don't let anyone try to free our DAG memory hole! */
1396        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1397
1398        /* If the packet buffer is currently owned by libtrace, free it so
1399         * that we can set the packet to point into the DAG memory hole */
1400        if (packet->buf_control == TRACE_CTRL_PACKET) {
1401                free(packet->buffer);
1402                packet->buffer = 0;
1403        }
1404
1405        /* Configure DAG card stream polling */
1406        if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd, PERPKT_DATA(t)->stream,
1407                                                        sizeof(dag_record_t), &maxwait, &pollwait) < 0) {
1408                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1409                return -1;
1410        }
1411
1412        /* Grab an ERF record */
1413        do {
1414                numbytes = dag_pavailable(libtrace, t);
1415                if (numbytes < 0)
1416                        return numbytes;
1417                if (numbytes < dag_record_size) {
1418                        if (libtrace_halt)
1419                                return 0;
1420
1421                        /* Check message queue to see if we should abort early */
1422                        if (libtrace_message_queue_count(&t->messages) > 0)
1423                                return -2;
1424
1425                        /* Keep trying until we see a packet */
1426                        continue;
1427                }
1428
1429                erfptr = dag_pget_record(libtrace, t);
1430        } while (erfptr == NULL);
1431
1432        /* Prepare the libtrace packet */
1433        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1434                                                   flags))
1435                return -1;
1436
1437        PERPKT_DATA(t)->pkt_count++;
1438
1439        return packet->payload ? htons(erfptr->rlen) :
1440                erf_get_framing_length(packet);
1441}
1442
1443static int dag_ppause_input(libtrace_t *libtrace) {
1444        int i, tot = libtrace->perpkt_thread_count;
1445        struct dag_per_thread_t *t_data;
1446
1447        /* Stop and detach all the streams */
1448        printf("Stopping and detaching all streams\n");
1449        for (i = 0; i < tot; i++) {
1450                t_data = &FORMAT_DATA->per_thread[i];
1451
1452                if (dag_stop_stream(t_data->device->fd,
1453                                                        t_data->stream) < 0) {
1454                        trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
1455                                                  t_data->stream);
1456                        return -1;
1457                }
1458
1459                if (dag_detach_stream(t_data->device->fd,
1460                                                          t_data->stream) < 0) {
1461                        trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
1462                                                  t_data->stream);
1463                        return -1;
1464                }
1465        }
1466
1467        /* Free up the per_thread array */
1468        free(FORMAT_DATA->per_thread);
1469        FORMAT_DATA->per_thread = NULL;
1470
1471        return 0;
1472}
1473
1474static int dag_pconfig_input(libtrace_t *libtrace,
1475                                                         trace_parallel_option_t option, void *value) {
1476
1477        /* We don't support any of these! Normally you configure the DAG card
1478         * externally. */
1479        switch(option) {
1480        case TRACE_OPTION_SET_HASHER:
1481        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1482        case TRACE_OPTION_TRACETIME:
1483        case TRACE_OPTION_TICK_INTERVAL:
1484        case TRACE_OPTION_GET_CONFIG:
1485        case TRACE_OPTION_SET_CONFIG:
1486                return -1;
1487        }
1488        /* We don't provide a default option to ensure that future options will
1489         * generate a compiler warning. */
1490
1491        return -1;
1492}
1493
1494static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1495                                                                bool reader) {
1496        /* XXX: This function gets run sequentially for all
1497         * threads. Should investigate making it parallel as draining the
1498         * memory could be needlessly time consuming.
1499         */
1500        uint8_t *top, *bottom;
1501        uint8_t diff = 0; /* XXX: Investigate this type, as I would assume the value
1502                                           * could be larger than 255 */
1503        struct timeval zero, nopoll;
1504
1505        top = bottom = NULL;
1506
1507        /* Minimum delay is 10mS */
1508        zero.tv_sec = 0;
1509        zero.tv_usec = 10000;
1510        nopoll = zero;
1511
1512        if (reader) {
1513                if (t->type == THREAD_PERPKT) {
1514                        /* Pass the per thread data to the thread */
1515                        t->format_data = &FORMAT_DATA->per_thread[t->perpkt_num];
1516
1517                        /* Attach and start the DAG stream */
1518                        printf("t%u: starting and attaching stream #%u\n", t->perpkt_num,
1519                                   PERPKT_DATA(t)->stream);
1520                        if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
1521                                                                  PERPKT_DATA(t)->stream, 0, 0) < 0) {
1522                                trace_set_err(libtrace, errno, "can't attach DAG stream #%u",
1523                                                          PERPKT_DATA(t)->stream);
1524                                return -1;
1525                        }
1526                        if (dag_start_stream(PERPKT_DATA(t)->device->fd,
1527                                                                 PERPKT_DATA(t)->stream) < 0) {
1528                                trace_set_err(libtrace, errno, "can't start DAG stream #%u",
1529                                                          PERPKT_DATA(t)->stream);
1530                                return -1;
1531                        }
1532
1533                        /* Ensure that dag_advance_stream will return without blocking */
1534                        if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
1535                                                                   PERPKT_DATA(t)->stream, 0, &zero,
1536                                                                   &nopoll) < 0) {
1537                                trace_set_err(libtrace, errno, "dag_set_stream_poll failed!");
1538                                return -1;
1539                        }
1540
1541                        /* Clear all the data from the memory hole */
1542                        do {
1543                                top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
1544                                                                                 PERPKT_DATA(t)->stream,
1545                                                                                 &bottom);
1546
1547                                assert(top && bottom);
1548                                diff = top - bottom;
1549                                bottom -= diff;
1550                        } while (diff != 0);
1551
1552                        PERPKT_DATA(t)->top = NULL;
1553                        PERPKT_DATA(t)->bottom = NULL;
1554                        PERPKT_DATA(t)->pkt_count = 0;
1555                        PERPKT_DATA(t)->drops = 0;
1556                } else {
1557                        /* TODO: Figure out why we need this */
1558                        t->format_data = &FORMAT_DATA->per_thread[0];
1559                }
1560        }
1561
1562        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
1563
1564        return 0;
1565}
1566
1567static struct libtrace_format_t dag = {
1568        "dag",
1569        "$Id$",
1570        TRACE_FORMAT_ERF,
1571        dag_probe_filename,             /* probe filename */
1572        NULL,                           /* probe magic */
1573        dag_init_input,                 /* init_input */
1574        dag_config_input,               /* config_input */
1575        dag_start_input,                /* start_input */
1576        dag_pause_input,                /* pause_input */
1577        dag_init_output,                /* init_output */
1578        NULL,                           /* config_output */
1579        dag_start_output,               /* start_output */
1580        dag_fin_input,                  /* fin_input */
1581        dag_fin_output,                 /* fin_output */
1582        dag_read_packet,                /* read_packet */
1583        dag_prepare_packet,             /* prepare_packet */
1584        NULL,                           /* fin_packet */
1585        dag_write_packet,               /* write_packet */
1586        erf_get_link_type,              /* get_link_type */
1587        erf_get_direction,              /* get_direction */
1588        erf_set_direction,              /* set_direction */
1589        erf_get_erf_timestamp,          /* get_erf_timestamp */
1590        NULL,                           /* get_timeval */
1591        NULL,                           /* get_seconds */
1592        NULL,                           /* get_timespec */
1593        NULL,                           /* seek_erf */
1594        NULL,                           /* seek_timeval */
1595        NULL,                           /* seek_seconds */
1596        erf_get_capture_length,         /* get_capture_length */
1597        erf_get_wire_length,            /* get_wire_length */
1598        erf_get_framing_length,         /* get_framing_length */
1599        erf_set_capture_length,         /* set_capture_length */
1600        NULL,                           /* get_received_packets */
1601        NULL,                           /* get_filtered_packets */
1602        dag_get_dropped_packets,        /* get_dropped_packets */
1603        NULL,                           /* get_captured_packets */
1604        NULL,                           /* get_fd */
1605        trace_event_dag,                /* trace_event */
1606        dag_help,                       /* help */
1607        NULL,                            /* next pointer */
1608                {true, 0}, /* live packet capture, thread limit TBD */
1609                dag_pstart_input,
1610                dag_pread_packet,
1611                dag_ppause_input,
1612                NULL,
1613                dag_pconfig_input,
1614                dag_pregister_thread,
1615                NULL
1616};
1617
1618void dag_constructor(void) {
1619        register_format(&dag);
1620}
Note: See TracBrowser for help on using the repository browser.