source: lib/format_dag25.c @ 9149564

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 9149564 was 9149564, checked in by Dan <dan@…>, 7 years ago

Cleaned up parallel support.

Need to consider reducing code duplication and also adding support for DUCK reporting.

  • Property mode set to 100644
File size: 46.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84#define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
85
86#define FORMAT_DATA DATA(libtrace)
87#define FORMAT_DATA_OUT DATA_OUT(libtrace)
88
89#define DUCK FORMAT_DATA->duck
90static struct libtrace_format_t dag;
91
92/* A DAG device - a DAG device can support multiple streams (and therefore
93 * multiple input traces) so each trace needs to refer to a device */
94struct dag_dev_t {
95        char * dev_name;                /* Device name */
96        int fd;                         /* File descriptor */
97        uint16_t ref_count;             /* Number of input / output traces
98                                           that are using this device */
99        struct dag_dev_t *prev;         /* Pointer to the previous device in
100                                           the device list */
101        struct dag_dev_t *next;         /* Pointer to the next device in the
102                                           device list */
103};
104
105/* "Global" data that is stored for each DAG output trace */
106struct dag_format_data_out_t {
107        /* The DAG device being used for writing */
108        struct dag_dev_t *device;
109        /* The DAG stream that is being written on */
110        unsigned int dagstream;
111        /* Boolean flag indicating whether the stream is currently attached */
112        int stream_attached;
113        /* The amount of data waiting to be transmitted, in bytes */
114        uint64_t waiting;
115        /* A buffer to hold the data to be transmittted */
116        uint8_t *txbuffer;
117};
118
119/* Data that is stored for each libtrace_thread_t */
120struct dag_per_thread_t {
121        struct dag_dev_t *device; /* DAG device */
122        uint16_t stream; /* Stream number */
123        uint8_t *top; /* Pointer to the last unread byte in the DAG memory */
124        uint8_t *bottom; /* Pointer to the first unread byte in the DAG memory */
125        uint32_t processed; /* Amount of data processed from the bottom pointer */
126        uint64_t pkt_count; /* Number of packets seen by the thread */
127};
128
129/* "Global" data that is stored for each DAG input trace */
130struct dag_format_data_t {
131
132        /* Data required for regular DUCK reporting */
133        struct {
134                /* Timestamp of the last DUCK report */
135                uint32_t last_duck;
136                /* The number of seconds between each DUCK report */
137                uint32_t duck_freq;
138                /* Timestamp of the last packet read from the DAG card */
139                uint32_t last_pkt;
140                /* Dummy trace to ensure DUCK packets are dealt with using the
141                 * DUCK format functions */
142                libtrace_t *dummy_duck;
143        } duck;
144
145        /* The DAG device that we are reading from */
146        struct dag_dev_t *device;
147        /* The DAG stream that we are reading from */
148        unsigned int dagstream;
149        /* Boolean flag indicating whether the stream is currently attached */
150        int stream_attached;
151        /* Pointer to the first unread byte in the DAG memory hole */
152        uint8_t *bottom;
153        /* Pointer to the last unread byte in the DAG memory hole */
154        uint8_t *top;
155        /* The amount of data processed thus far from the bottom pointer */
156        uint32_t processed;
157        /* The number of packets that have been dropped */
158        uint64_t drops;
159        /* When running in parallel mode this is malloc'd with an array of thread
160         * structures. Most of the stuff above doesn't get used in parallel mode. */
161        struct dag_per_thread_t *per_thread;
162};
163
164/* To be thread-safe, we're going to need a mutex for operating on the list
165 * of DAG devices */
166pthread_mutex_t open_dag_mutex;
167
168/* The list of DAG devices that have been opened by libtrace.
169 *
170 * We can only open each DAG device once, but we might want to read from
171 * multiple streams. Therefore, we need to maintain a list of devices that we
172 * have opened (with ref counts!) so that we don't try to open a device too
173 * many times or close a device that we're still using */
174struct dag_dev_t *open_dags = NULL;
175
176/* Returns the amount of padding between the ERF header and the start of the
177 * captured packet data */
178static int dag_get_padding(const libtrace_packet_t *packet)
179{
180        /* ERF Ethernet records have a 2 byte padding before the packet itself
181         * so that the IP header is aligned on a 32 bit boundary.
182         */
183        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
184                dag_record_t *erfptr = (dag_record_t *)packet->header;
185                switch(erfptr->type) {
186                        case TYPE_ETH:
187                        case TYPE_DSM_COLOR_ETH:
188                                return 2;
189                        default:                return 0;
190                }
191        }
192        else {
193                switch(trace_get_link_type(packet)) {
194                        case TRACE_TYPE_ETH:    return 2;
195                        default:                return 0;
196                }
197        }
198}
199
200/* Attempts to determine if the given filename refers to a DAG device */
201static int dag_probe_filename(const char *filename)
202{
203        struct stat statbuf;
204        /* Can we stat the file? */
205        if (stat(filename, &statbuf) != 0) {
206                return 0;
207        }
208        /* Is it a character device? */
209        if (!S_ISCHR(statbuf.st_mode)) {
210                return 0;
211        }
212        /* Yeah, it's probably us. */
213        return 1;
214}
215
216/* Initialises the DAG output data structure */
217static void dag_init_format_out_data(libtrace_out_t *libtrace) {
218        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
219        // no DUCK on output
220        FORMAT_DATA_OUT->stream_attached = 0;
221        FORMAT_DATA_OUT->device = NULL;
222        FORMAT_DATA_OUT->dagstream = 0;
223        FORMAT_DATA_OUT->waiting = 0;
224
225}
226
227/* Initialises the DAG input data structure */
228static void dag_init_format_data(libtrace_t *libtrace) {
229        libtrace->format_data = (struct dag_format_data_t *)
230                malloc(sizeof(struct dag_format_data_t));
231        DUCK.last_duck = 0;
232        DUCK.duck_freq = 0;
233        DUCK.last_pkt = 0;
234        DUCK.dummy_duck = NULL;
235        FORMAT_DATA->stream_attached = 0;
236        FORMAT_DATA->drops = 0;
237        FORMAT_DATA->device = NULL;
238        FORMAT_DATA->dagstream = 0;
239        FORMAT_DATA->processed = 0;
240        FORMAT_DATA->bottom = NULL;
241        FORMAT_DATA->top = NULL;
242}
243
244/* Determines if there is already an entry for the given DAG device in the
245 * device list and increments the reference count for that device, if found.
246 *
247 * NOTE: This function assumes the open_dag_mutex is held by the caller */
248static struct dag_dev_t *dag_find_open_device(char *dev_name) {
249        struct dag_dev_t *dag_dev;
250
251        dag_dev = open_dags;
252
253        /* XXX: Not exactly zippy, but how often are we going to be dealing
254         * with multiple dag cards? */
255        while (dag_dev != NULL) {
256                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
257                        dag_dev->ref_count ++;
258                        return dag_dev;
259
260                }
261                dag_dev = dag_dev->next;
262        }
263        return NULL;
264
265
266}
267
268/* Closes a DAG device and removes it from the device list.
269 *
270 * Attempting to close a DAG device that has a non-zero reference count will
271 * cause an assertion failure!
272 *
273 * NOTE: This function assumes the open_dag_mutex is held by the caller */
274static void dag_close_device(struct dag_dev_t *dev) {
275        /* Need to remove from the device list */
276
277        assert(dev->ref_count == 0);
278
279        if (dev->prev == NULL) {
280                open_dags = dev->next;
281                if (dev->next)
282                        dev->next->prev = NULL;
283        } else {
284                dev->prev->next = dev->next;
285                if (dev->next)
286                        dev->next->prev = dev->prev;
287        }
288
289        dag_close(dev->fd);
290        if (dev->dev_name)
291        free(dev->dev_name);
292        free(dev);
293}
294
295
296/* Opens a new DAG device for writing and adds it to the DAG device list
297 *
298 * NOTE: this function should only be called when opening a DAG device for
299 * writing - there is little practical difference between this and the
300 * function below that covers the reading case, but we need the output trace
301 * object to report errors properly so the two functions take slightly
302 * different arguments. This is really lame and there should be a much better
303 * way of doing this.
304 *
305 * NOTE: This function assumes the open_dag_mutex is held by the caller
306 */
307static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
308        struct stat buf;
309        int fd;
310        struct dag_dev_t *new_dev;
311
312        /* Make sure the device exists */
313        if (stat(dev_name, &buf) == -1) {
314                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
315                return NULL;
316}
317
318        /* Make sure it is the appropriate type of device */
319        if (S_ISCHR(buf.st_mode)) {
320                /* Try opening the DAG device */
321                if((fd = dag_open(dev_name)) < 0) {
322                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
323                                        dev_name);
324                        return NULL;
325                }
326        } else {
327                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
328                                dev_name);
329                return NULL;
330        }
331
332        /* Add the device to our device list - it is just a doubly linked
333         * list with no inherent ordering; just tack the new one on the front
334         */
335        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
336        new_dev->fd = fd;
337        new_dev->dev_name = dev_name;
338        new_dev->ref_count = 1;
339
340        new_dev->prev = NULL;
341        new_dev->next = open_dags;
342        if (open_dags)
343                open_dags->prev = new_dev;
344
345        open_dags = new_dev;
346
347        return new_dev;
348}
349
350/* Opens a new DAG device for reading and adds it to the DAG device list
351 *
352 * NOTE: this function should only be called when opening a DAG device for
353 * reading - there is little practical difference between this and the
354 * function above that covers the writing case, but we need the input trace
355 * object to report errors properly so the two functions take slightly
356 * different arguments. This is really lame and there should be a much better
357 * way of doing this.
358 *
359 * NOTE: This function assumes the open_dag_mutex is held by the caller */
360static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
361        struct stat buf;
362        int fd;
363        struct dag_dev_t *new_dev;
364
365        /* Make sure the device exists */
366        if (stat(dev_name, &buf) == -1) {
367                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
368                return NULL;
369        }
370
371        /* Make sure it is the appropriate type of device */
372        if (S_ISCHR(buf.st_mode)) {
373                /* Try opening the DAG device */
374                if((fd = dag_open(dev_name)) < 0) {
375                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
376                                        dev_name);
377                        return NULL;
378                }
379        } else {
380                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
381                                dev_name);
382                return NULL;
383        }
384
385        /* Add the device to our device list - it is just a doubly linked
386         * list with no inherent ordering; just tack the new one on the front
387         */
388        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
389        new_dev->fd = fd;
390        new_dev->dev_name = dev_name;
391        new_dev->ref_count = 1;
392
393        new_dev->prev = NULL;
394        new_dev->next = open_dags;
395        if (open_dags)
396                open_dags->prev = new_dev;
397
398        open_dags = new_dev;
399
400        return new_dev;
401}
402
403/* Creates and initialises a DAG output trace */
404static int dag_init_output(libtrace_out_t *libtrace) {
405        char *dag_dev_name = NULL;
406        char *scan = NULL;
407        struct dag_dev_t *dag_device = NULL;
408        int stream = 1;
409       
410        /* XXX I don't know if this is important or not, but this function
411         * isn't present in all of the driver releases that this code is
412         * supposed to support! */
413        /*
414        unsigned long wake_time;
415        dagutil_sleep_get_wake_time(&wake_time,0);
416        */
417
418        dag_init_format_out_data(libtrace);
419        /* Grab the mutex while we're likely to be messing with the device
420         * list */
421        pthread_mutex_lock(&open_dag_mutex);
422       
423        /* Specific streams are signified using a comma in the libtrace URI,
424         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
425         *
426         * If no stream is specified, we will write using stream 1 */
427        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
428                dag_dev_name = strdup(libtrace->uridata);
429        } else {
430                dag_dev_name = (char *)strndup(libtrace->uridata,
431                                (size_t)(scan - libtrace->uridata));
432                stream = atoi(++scan);
433        }
434        FORMAT_DATA_OUT->dagstream = stream;
435
436        /* See if our DAG device is already open */
437        dag_device = dag_find_open_device(dag_dev_name);
438
439        if (dag_device == NULL) {
440                /* Device not yet opened - open it ourselves */
441                dag_device = dag_open_output_device(libtrace, dag_dev_name);
442        } else {
443                /* Otherwise, just use the existing one */
444                free(dag_dev_name);
445                dag_dev_name = NULL;
446        }
447
448        /* Make sure we have successfully opened a DAG device */
449        if (dag_device == NULL) {
450                if (dag_dev_name) {
451                        free(dag_dev_name);
452                }
453                pthread_mutex_unlock(&open_dag_mutex);
454                return -1;
455        }
456
457        FORMAT_DATA_OUT->device = dag_device;
458        pthread_mutex_unlock(&open_dag_mutex);
459        return 0;
460}
461
462/* Creates and initialises a DAG input trace */
463static int dag_init_input(libtrace_t *libtrace) {
464        char *dag_dev_name = NULL;
465        char *scan = NULL;
466        int stream = 0, thread_count = 1;
467        struct dag_dev_t *dag_device = NULL;
468
469        dag_init_format_data(libtrace);
470        /* Grab the mutex while we're likely to be messing with the device
471         * list */
472        pthread_mutex_lock(&open_dag_mutex);
473
474
475        /* DAG cards support multiple streams. In a single threaded capture,
476         * these are specified using a comma in the libtrace URI,
477         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
478         *
479         * If no stream is specified, we will read from stream 0 with one thread
480         */
481        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
482                dag_dev_name = strdup(libtrace->uridata);
483               
484        } else {
485                dag_dev_name = (char *)strndup(libtrace->uridata,
486                                                                           (size_t)(scan - libtrace->uridata));
487                stream = atoi(++scan);
488        }
489
490        FORMAT_DATA->dagstream = stream;
491
492        /* See if our DAG device is already open */
493        dag_device = dag_find_open_device(dag_dev_name);
494
495        if (dag_device == NULL) {
496                /* Device not yet opened - open it ourselves */
497                dag_device = dag_open_device(libtrace, dag_dev_name);
498        } else {
499                /* Otherwise, just use the existing one */
500                free(dag_dev_name);
501                dag_dev_name = NULL;
502        }
503
504        /* Make sure we have successfully opened a DAG device */
505        if (dag_device == NULL) {
506                if (dag_dev_name)
507                        free(dag_dev_name);
508                dag_dev_name = NULL;
509                pthread_mutex_unlock(&open_dag_mutex);
510                return -1;
511        }
512
513        FORMAT_DATA->device = dag_device;
514
515        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
516        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
517 
518        *  The symptom of the port being disabled is that libtrace will appear to hang.
519        */
520        /* Check kBooleanAttributeFault is false */
521        /* Check kBooleanAttributeLocalFault is false */
522        /* Check kBooleanAttributeLock is true ? */
523        /* Check kBooleanAttributePeerLink ? */
524
525        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
526        /* Set kUint32AttributeSnapLength to the snaplength */
527
528        pthread_mutex_unlock(&open_dag_mutex);
529        return 0;
530}
531
532/* Configures a DAG input trace */
533static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
534                                void *data) {
535        char conf_str[4096];
536        switch(option) {
537                case TRACE_OPTION_META_FREQ:
538                        /* This option is used to specify the frequency of DUCK
539                         * updates */
540                        DUCK.duck_freq = *(int *)data;
541                        return 0;
542                case TRACE_OPTION_SNAPLEN:
543                        /* Tell the card our new snap length */
544                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
545                        if (dag_configure(FORMAT_DATA->device->fd,
546                                                conf_str) != 0) {
547                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
548                                return -1;
549                        }
550                        return 0;
551                case TRACE_OPTION_PROMISC:
552                        /* DAG already operates in a promisc fashion */
553                        return -1;
554                case TRACE_OPTION_FILTER:
555                        /* We don't yet support pushing filters into DAG
556                         * cards */
557                        return -1;
558                case TRACE_OPTION_EVENT_REALTIME:
559                        /* Live capture is always going to be realtime */
560                        return -1;
561        }
562        return -1;
563}
564
565/* Starts a DAG output trace */
566static int dag_start_output(libtrace_out_t *libtrace) {
567        struct timeval zero, nopoll;
568
569        zero.tv_sec = 0;
570        zero.tv_usec = 0;
571        nopoll = zero;
572
573        /* Attach and start the DAG stream */
574
575        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
576                        FORMAT_DATA_OUT->dagstream, 0, 1048576) < 0) {
577                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
578                return -1;
579        }
580
581        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
582                        FORMAT_DATA_OUT->dagstream) < 0) {
583                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
584                return -1;
585        }
586        FORMAT_DATA_OUT->stream_attached = 1;
587
588        /* We don't want the dag card to do any sleeping */
589
590        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
591                        FORMAT_DATA_OUT->dagstream, 0, &zero,
592                        &nopoll);
593
594        return 0;
595}
596
597/* Starts a DAG input trace */
598static int dag_start_input(libtrace_t *libtrace) {
599        struct timeval zero, nopoll;
600        uint8_t *top, *bottom;
601        uint8_t diff = 0;
602        top = bottom = NULL;
603
604        zero.tv_sec = 0;
605        zero.tv_usec = 10000;
606        nopoll = zero;
607
608        /* Attach and start the DAG stream */
609        if (dag_attach_stream(FORMAT_DATA->device->fd,
610                                FORMAT_DATA->dagstream, 0, 0) < 0) {
611                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
612                return -1;
613        }
614
615        if (dag_start_stream(FORMAT_DATA->device->fd,
616                                FORMAT_DATA->dagstream) < 0) {
617                trace_set_err(libtrace, errno, "Cannot start DAG stream");
618                return -1;
619        }
620        FORMAT_DATA->stream_attached = 1;
621       
622        /* We don't want the dag card to do any sleeping */
623        dag_set_stream_poll(FORMAT_DATA->device->fd,
624                                FORMAT_DATA->dagstream, 0, &zero,
625                                &nopoll);
626
627        /* Should probably flush the memory hole now */
628        do {
629                top = dag_advance_stream(FORMAT_DATA->device->fd,
630                                        FORMAT_DATA->dagstream,
631                                        &bottom);
632                assert(top && bottom);
633                diff = top - bottom;
634                bottom -= diff;
635        } while (diff != 0);
636        FORMAT_DATA->top = NULL;
637        FORMAT_DATA->bottom = NULL;
638        FORMAT_DATA->processed = 0;
639        FORMAT_DATA->drops = 0;
640
641        return 0;
642}
643
644/* Pauses a DAG output trace */
645static int dag_pause_output(libtrace_out_t *libtrace) {
646
647        /* Stop and detach the stream */
648        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
649                        FORMAT_DATA_OUT->dagstream) < 0) {
650                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
651                return -1;
652        }
653        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
654                        FORMAT_DATA_OUT->dagstream) < 0) {
655                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
656                return -1;
657        }
658        FORMAT_DATA_OUT->stream_attached = 0;
659        return 0;
660}
661
662/* Pauses a DAG input trace */
663static int dag_pause_input(libtrace_t *libtrace) {
664
665        /* Stop and detach the stream */
666        if (dag_stop_stream(FORMAT_DATA->device->fd,
667                                FORMAT_DATA->dagstream) < 0) {
668                trace_set_err(libtrace, errno, "Could not stop DAG stream");
669                return -1;
670        }
671        if (dag_detach_stream(FORMAT_DATA->device->fd,
672                                FORMAT_DATA->dagstream) < 0) {
673                trace_set_err(libtrace, errno, "Could not detach DAG stream");
674                return -1;
675        }
676        FORMAT_DATA->stream_attached = 0;
677        return 0;
678}
679
680/* Closes a DAG input trace */
681static int dag_fin_input(libtrace_t *libtrace) {
682        /* Need the lock, since we're going to be handling the device list */
683        pthread_mutex_lock(&open_dag_mutex);
684       
685        /* Detach the stream if we are not paused */
686        if (FORMAT_DATA->stream_attached)
687                dag_pause_input(libtrace);
688        FORMAT_DATA->device->ref_count --;
689
690        /* Close the DAG device if there are no more references to it */
691        if (FORMAT_DATA->device->ref_count == 0)
692                dag_close_device(FORMAT_DATA->device);
693        if (DUCK.dummy_duck)
694                trace_destroy_dead(DUCK.dummy_duck);
695        free(libtrace->format_data);
696        pthread_mutex_unlock(&open_dag_mutex);
697        return 0; /* success */
698}
699
700/* Closes a DAG output trace */
701static int dag_fin_output(libtrace_out_t *libtrace) {
702       
703        /* Commit any outstanding traffic in the txbuffer */
704        if (FORMAT_DATA_OUT->waiting) {
705                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
706                                FORMAT_DATA_OUT->waiting );
707        }
708
709        /* Wait until the buffer is nearly clear before exiting the program,
710         * as we will lose packets otherwise */
711        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
712                        FORMAT_DATA_OUT->dagstream,
713                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
714                                        FORMAT_DATA_OUT->dagstream) - 8
715                        );
716
717        /* Need the lock, since we're going to be handling the device list */
718        pthread_mutex_lock(&open_dag_mutex);
719
720        /* Detach the stream if we are not paused */
721        if (FORMAT_DATA_OUT->stream_attached)
722                dag_pause_output(libtrace);
723        FORMAT_DATA_OUT->device->ref_count --;
724
725        /* Close the DAG device if there are no more references to it */
726        if (FORMAT_DATA_OUT->device->ref_count == 0)
727                dag_close_device(FORMAT_DATA_OUT->device);
728        free(libtrace->format_data);
729        pthread_mutex_unlock(&open_dag_mutex);
730        return 0; /* success */
731}
732
733/* Extracts DUCK information from the DAG card and produces a DUCK packet */
734static int dag_get_duckinfo(libtrace_t *libtrace,
735                                libtrace_packet_t *packet) {
736
737        /* Allocate memory for the DUCK data */
738        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
739                        !packet->buffer) {
740                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
741                packet->buf_control = TRACE_CTRL_PACKET;
742                if (!packet->buffer) {
743                        trace_set_err(libtrace, errno,
744                                        "Cannot allocate packet buffer");
745                        return -1;
746                }
747        }
748
749        /* DUCK doesn't have a format header */
750        packet->header = 0;
751        packet->payload = packet->buffer;
752
753        /* No need to check if we can get DUCK or not - we're modern
754         * enough so just grab the DUCK info */
755        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
756                                        (duckinf_t *)packet->payload) < 0)) {
757                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
758                return -1;
759        }
760
761        packet->type = TRACE_RT_DUCK_2_5;
762
763        /* Set the packet's tracce to point at a DUCK trace, so that the
764         * DUCK format functions will be called on the packet rather than the
765         * DAG ones */
766        if (!DUCK.dummy_duck)
767                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
768        packet->trace = DUCK.dummy_duck;
769        return sizeof(duckinf_t);
770}
771
772/* Determines the amount of data available to read from the DAG card */
773static int dag_available(libtrace_t *libtrace) {
774        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
775
776        /* If we've processed more than 4MB of data since we last called
777         * dag_advance_stream, then we should call it again to allow the
778         * space occupied by that 4MB to be released */
779        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
780                return diff;
781       
782        /* Update the top and bottom pointers */
783        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
784                        FORMAT_DATA->dagstream,
785                        &(FORMAT_DATA->bottom));
786       
787        if (FORMAT_DATA->top == NULL) {
788                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
789                return -1;
790        }
791        FORMAT_DATA->processed = 0;
792        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
793        return diff;
794}
795
796/* Returns a pointer to the start of the next complete ERF record */
797static dag_record_t *dag_get_record(libtrace_t *libtrace) {
798        dag_record_t *erfptr = NULL;
799        uint16_t size;
800        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
801        if (!erfptr)
802                return NULL;
803        size = ntohs(erfptr->rlen);
804        assert( size >= dag_record_size );
805        /* Make certain we have the full packet available */
806        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
807                return NULL;
808        FORMAT_DATA->bottom += size;
809        FORMAT_DATA->processed += size;
810        return erfptr;
811}
812
813/* Converts a buffer containing a recently read DAG packet record into a
814 * libtrace packet */
815static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
816                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
817
818        dag_record_t *erfptr;
819       
820        /* If the packet previously owned a buffer that is not the buffer
821         * that contains the new packet data, we're going to need to free the
822         * old one to avoid memory leaks */
823        if (packet->buffer != buffer &&
824                        packet->buf_control == TRACE_CTRL_PACKET) {
825                free(packet->buffer);
826        }
827
828        /* Set the buffer owner appropriately */
829        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
830                packet->buf_control = TRACE_CTRL_PACKET;
831        } else
832                packet->buf_control = TRACE_CTRL_EXTERNAL;
833
834        /* Update the packet pointers and type appropriately */
835        erfptr = (dag_record_t *)buffer;
836        packet->buffer = erfptr;
837        packet->header = erfptr;
838        packet->type = rt_type;
839
840        if (erfptr->flags.rxerror == 1) {
841                /* rxerror means the payload is corrupt - drop the payload
842                 * by tweaking rlen */
843                packet->payload = NULL;
844                erfptr->rlen = htons(erf_get_framing_length(packet));
845        } else {
846                packet->payload = (char*)packet->buffer
847                        + erf_get_framing_length(packet);
848        }
849
850        if (libtrace->format_data == NULL) {
851                dag_init_format_data(libtrace);
852        }
853
854        /* TODO: This isn't thread safe */
855#if 0
856        /* Update the dropped packets counter */
857
858        /* No loss counter for DSM coloured records - have to use
859         * some other API */
860        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
861                /* TODO */
862        } else {
863                /* Use the ERF loss counter */
864                DATA(libtrace)->drops += ntohs(erfptr->lctr);
865        }
866#endif
867
868        return 0;
869}
870
871/*
872 * dag_write_packet() at this stage attempts to improve tx performance
873 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
874 * has been met. I observed approximately 270% performance increase
875 * through this relatively naive tweak. No optimisation of buffer sizes
876 * was attempted.
877 */
878
879/* Pushes an ERF record onto the transmit stream */
880static int dag_dump_packet(libtrace_out_t *libtrace,
881                dag_record_t *erfptr, unsigned int pad, void *buffer) {
882        int size;
883
884        /*
885         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
886         * requested any space yet, and request some, storing the pointer at
887         * FORMAT_DATA_OUT->txbuffer.
888         *
889         * The amount to request is slightly magical at the moment - it's
890         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
891         * the buffer and handle overruns.
892         */
893        if (FORMAT_DATA_OUT->waiting == 0) {
894                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
895                                FORMAT_DATA_OUT->dagstream, 16908288);
896        }
897
898        /*
899         * Copy the header separately to the body, as we can't guarantee they
900         * are in contiguous memory
901         */
902        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
903        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
904
905
906
907        /*
908         * Copy our incoming packet into the outgoing buffer, and increment
909         * our waiting count
910         */
911        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
912        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
913        FORMAT_DATA_OUT->waiting += size;
914
915        /*
916         * If our output buffer has more than 16 Mebibytes in it, commit those
917         * bytes and reset the waiting count to 0.
918         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
919         * case there is still data in the buffer at program exit.
920         */
921
922        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
923                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
924                        FORMAT_DATA_OUT->waiting );
925                FORMAT_DATA_OUT->waiting = 0;
926        }
927
928        return size + pad + dag_record_size;
929
930}
931
932/* Attempts to determine a suitable ERF type for a given packet. Returns true
933 * if one is found, false otherwise */
934static bool find_compatible_linktype(libtrace_out_t *libtrace,
935                                libtrace_packet_t *packet, char *type)
936{
937         // Keep trying to simplify the packet until we can find
938         //something we can do with it
939
940        do {
941                *type=libtrace_to_erf_type(trace_get_link_type(packet));
942
943                // Success
944                if (*type != (char)-1)
945                        return true;
946
947                if (!demote_packet(packet)) {
948                        trace_set_err_out(libtrace,
949                                        TRACE_ERR_NO_CONVERSION,
950                                        "No erf type for packet (%i)",
951                                        trace_get_link_type(packet));
952                        return false;
953                }
954
955        } while(1);
956
957        return true;
958}
959
960/* Writes a packet to the provided DAG output trace */
961static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
962        /*
963         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
964         * sucks, sorry about that.
965         */
966        unsigned int pad = 0;
967        int numbytes;
968        void *payload = packet->payload;
969        dag_record_t *header = (dag_record_t *)packet->header;
970        char erf_type = 0;
971
972        if(!packet->header) {
973                /* No header, probably an RT packet. Lifted from
974                 * erf_write_packet(). */
975                return -1;
976        }
977
978        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
979                return 0;
980
981        pad = dag_get_padding(packet);
982
983        /*
984         * If the payload is null, adjust the rlen. Discussion of this is
985         * attached to erf_write_packet()
986         */
987        if (payload == NULL) {
988                header->rlen = htons(dag_record_size + pad);
989        }
990
991        if (packet->type == TRACE_RT_DATA_ERF) {
992                numbytes = dag_dump_packet(libtrace,
993                                header,
994                                pad,
995                                payload
996                                );
997
998        } else {
999                /* Build up a new packet header from the existing header */
1000
1001                /* Simplify the packet first - if we can't do this, break
1002                 * early */
1003                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1004                        return -1;
1005
1006                dag_record_t erfhdr;
1007
1008                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1009                payload=packet->payload;
1010                pad = dag_get_padding(packet);
1011
1012                /* Flags. Can't do this */
1013                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1014                if (trace_get_direction(packet)!=(int)~0U)
1015                        erfhdr.flags.iface = trace_get_direction(packet);
1016
1017                erfhdr.type = erf_type;
1018
1019                /* Packet length (rlen includes format overhead) */
1020                assert(trace_get_capture_length(packet)>0
1021                                && trace_get_capture_length(packet)<=65536);
1022                assert(erf_get_framing_length(packet)>0
1023                                && trace_get_framing_length(packet)<=65536);
1024                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1025                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1026
1027                erfhdr.rlen = htons(trace_get_capture_length(packet)
1028                        + erf_get_framing_length(packet));
1029
1030
1031                /* Loss counter. Can't do this */
1032                erfhdr.lctr = 0;
1033                /* Wire length, does not include padding! */
1034                erfhdr.wlen = htons(trace_get_wire_length(packet));
1035
1036                /* Write it out */
1037                numbytes = dag_dump_packet(libtrace,
1038                                &erfhdr,
1039                                pad,
1040                                payload);
1041
1042        }
1043
1044        return numbytes;
1045}
1046
1047/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1048 *
1049 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1050 */
1051static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1052        int size = 0;
1053        struct timeval tv;
1054        dag_record_t *erfptr = NULL;
1055        int numbytes = 0;
1056        uint32_t flags = 0;
1057        struct timeval maxwait;
1058        struct timeval pollwait;
1059
1060        pollwait.tv_sec = 0;
1061        pollwait.tv_usec = 10000;
1062        maxwait.tv_sec = 0;
1063        maxwait.tv_usec = 250000;
1064
1065        /* Check if we're due for a DUCK report */
1066        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
1067                        DUCK.duck_freq != 0) {
1068                size = dag_get_duckinfo(libtrace, packet);
1069                DUCK.last_duck = DUCK.last_pkt;
1070                if (size != 0) {
1071                        return size;
1072                }
1073                /* No DUCK support, so don't waste our time anymore */
1074                DUCK.duck_freq = 0;
1075        }
1076
1077        /* Don't let anyone try to free our DAG memory hole! */
1078        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1079
1080        /* If the packet buffer is currently owned by libtrace, free it so
1081         * that we can set the packet to point into the DAG memory hole */
1082        if (packet->buf_control == TRACE_CTRL_PACKET) {
1083                free(packet->buffer);
1084                packet->buffer = 0;
1085        }
1086       
1087        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1088                        FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 
1089                        &pollwait) == -1)
1090        {
1091                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1092                return -1;
1093        }
1094
1095
1096        /* Grab a full ERF record */
1097        do {
1098                numbytes = dag_available(libtrace);
1099                if (numbytes < 0)
1100                        return numbytes;
1101                if (numbytes < dag_record_size) {
1102                        if (libtrace_halt)
1103                                return 0;
1104                        /* Block until we see a packet */
1105                        continue;
1106                }
1107                erfptr = dag_get_record(libtrace);
1108        } while (erfptr == NULL);
1109
1110        /* Prepare the libtrace packet */
1111        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1112                                flags))
1113                return -1;
1114
1115        /* Update the DUCK timer */
1116        tv = trace_get_timeval(packet);
1117        DUCK.last_pkt = tv.tv_sec;
1118
1119        return packet->payload ? htons(erfptr->rlen) :
1120                                erf_get_framing_length(packet);
1121}
1122
1123/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1124 * packet is available, we will return a packet event. Otherwise we will
1125 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1126 */
1127static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1128                                        libtrace_packet_t *packet) {
1129        libtrace_eventobj_t event = {0,0,0.0,0};
1130        dag_record_t *erfptr = NULL;
1131        int numbytes;
1132        uint32_t flags = 0;
1133        struct timeval minwait;
1134       
1135        minwait.tv_sec = 0;
1136        minwait.tv_usec = 10000;
1137       
1138        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1139                        FORMAT_DATA->dagstream, 0, &minwait, 
1140                        &minwait) == -1)
1141        {
1142                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1143                event.type = TRACE_EVENT_TERMINATE;
1144                return event;
1145        }
1146
1147        do {
1148                erfptr = NULL;
1149                numbytes = 0;
1150       
1151                /* Need to call dag_available so that the top pointer will get
1152                 * updated, otherwise we'll never see any data! */
1153                numbytes = dag_available(libtrace);
1154
1155                /* May as well not bother calling dag_get_record if
1156                 * dag_available suggests that there's no data */
1157                if (numbytes != 0)
1158                        erfptr = dag_get_record(libtrace);
1159                if (erfptr == NULL) {
1160                        /* No packet available - sleep for a very short time */
1161                        if (libtrace_halt) {
1162                                event.type = TRACE_EVENT_TERMINATE;
1163                        } else {                       
1164                                event.type = TRACE_EVENT_SLEEP;
1165                                event.seconds = 0.0001;
1166                        }
1167                        break;
1168                }
1169                if (dag_prepare_packet(libtrace, packet, erfptr, 
1170                                        TRACE_RT_DATA_ERF, flags)) {
1171                        event.type = TRACE_EVENT_TERMINATE;
1172                        break;
1173                }
1174
1175
1176                event.size = trace_get_capture_length(packet) + 
1177                                trace_get_framing_length(packet);
1178               
1179                /* XXX trace_read_packet() normally applies the following
1180                 * config options for us, but this function is called via
1181                 * trace_event() so we have to do it ourselves */
1182
1183                if (libtrace->filter) {
1184                        int filtret = trace_apply_filter(libtrace->filter, 
1185                                        packet);
1186                        if (filtret == -1) {
1187                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1188                                                "Bad BPF Filter");
1189                                event.type = TRACE_EVENT_TERMINATE;
1190                                break;
1191                        }
1192
1193                        if (filtret == 0) {
1194                                /* This packet isn't useful so we want to
1195                                 * immediately see if there is another suitable
1196                                 * one - we definitely DO NOT want to return
1197                                 * a sleep event in this case, like we used to
1198                                 * do! */
1199                                libtrace->filtered_packets ++;
1200                                trace_clear_cache(packet);
1201                                continue;
1202                        }
1203                               
1204                        event.type = TRACE_EVENT_PACKET;
1205                } else {
1206                        event.type = TRACE_EVENT_PACKET;
1207                }
1208
1209                if (libtrace->snaplen > 0) {
1210                        trace_set_capture_length(packet, libtrace->snaplen);
1211                }
1212                libtrace->accepted_packets ++;
1213                break;
1214        } while (1);
1215
1216        return event;
1217}
1218
1219/* Gets the number of dropped packets */
1220static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1221        if (trace->format_data == NULL)
1222                return (uint64_t)-1;
1223        return DATA(trace)->drops;
1224}
1225
1226/* Prints some semi-useful help text about the DAG format module */
1227static void dag_help(void) {
1228        printf("dag format module: $Revision: 1755 $\n");
1229        printf("Supported input URIs:\n");
1230        printf("\tdag:/dev/dagn\n");
1231        printf("\n");
1232        printf("\te.g.: dag:/dev/dag0\n");
1233        printf("\n");
1234        printf("Supported output URIs:\n");
1235        printf("\tnone\n");
1236        printf("\n");
1237}
1238
1239static int dag_pstart_input(libtrace_t *libtrace) {
1240        char *scan, *tok;
1241        uint16_t stream_count = 0, max_streams;
1242        /* We keep our own pointer to per_thread as the system will free
1243         * up FORMAT_DATA without freeing this if something goes wrong */
1244        struct dag_per_thread_t *per_thread = NULL;
1245        int iserror = 0;
1246
1247        /* Check we aren't trying to create more threads than the DAG card can
1248         * handle */
1249        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
1250        if (libtrace->perpkt_thread_count > max_streams) {
1251                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "trying to create too "
1252                                          "many threads (max is %u)", max_streams);
1253                iserror = 1;
1254                goto cleanup;
1255        }
1256
1257        /* Create the thread structures */
1258        per_thread = calloc(libtrace->perpkt_thread_count,
1259                                                                         sizeof(struct dag_per_thread_t));
1260        FORMAT_DATA->per_thread = per_thread;
1261
1262        /* Get the stream names from the uri */
1263        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
1264                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri doesn't "
1265                                          "specify the DAG streams");
1266                iserror = 1;
1267                goto cleanup;
1268        }
1269
1270        scan++;
1271 
1272        tok = strtok(scan, ",");
1273        while (tok != NULL) {
1274                /* Ensure we haven't specified too many streams */
1275                if (stream_count >= libtrace->perpkt_thread_count) {
1276                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri specifies too "
1277                                          "many streams. Max is %u", max_streams);
1278                        iserror = 1;
1279                        goto cleanup;
1280                }
1281
1282                /* Save the stream details */
1283                per_thread[stream_count].device = FORMAT_DATA->device;
1284                per_thread[stream_count++].stream = (uint16_t)atoi(tok);
1285
1286                tok = strtok(NULL, ",");
1287        }
1288
1289 cleanup:
1290        if (iserror) {
1291                /* Free the per_thread memory */
1292                free(per_thread);
1293               
1294                return -1;
1295        } else {
1296                return 0;
1297        }
1298}
1299
1300
1301
1302/* TODO: Fold this into dag_available */
1303static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t) {
1304        uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1305
1306        /* If we've processed more than 4MB of data since we last called
1307         * dag_advance_stream, then we should call it again to allow the
1308         * space occupied by that 4MB to be released */
1309        if (diff >= dag_record_size && PERPKT_DATA(t)->processed < 4 * 1024 * 1024)
1310                return diff;
1311
1312        /* Update top and bottom pointers */
1313        PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
1314                                                                                         PERPKT_DATA(t)->stream,
1315                                                                                         &(PERPKT_DATA(t)->bottom));
1316
1317        if (PERPKT_DATA(t)->top == NULL) {
1318                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
1319                return -1;
1320        }
1321
1322        PERPKT_DATA(t)->processed = 0;
1323        diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1324        return diff;
1325}
1326
1327/* TODO: Fold this into dag_get_record */
1328static dag_record_t *dag_pget_record(libtrace_t *libtrace,
1329                                                                         libtrace_thread_t *t) {
1330        dag_record_t *erfptr = NULL;
1331        uint16_t size;
1332
1333        erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
1334        if (!erfptr)
1335                return NULL;
1336
1337        /* Ensure we have a whole record */
1338        size = ntohs(erfptr->rlen);
1339        assert(size >= dag_record_size);
1340        if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
1341                return NULL;
1342
1343        /* Advance the buffer pointers */
1344        PERPKT_DATA(t)->bottom += size;
1345        PERPKT_DATA(t)->processed += size;
1346       
1347        return erfptr;
1348}
1349
1350static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
1351                                                        libtrace_packet_t *packet) {
1352        dag_record_t *erfptr = NULL;
1353        int numbytes = 0;
1354        uint32_t flags = 0;
1355        struct timeval maxwait, pollwait;
1356
1357        pollwait.tv_sec = 0;
1358        pollwait.tv_usec = 10000;
1359        maxwait.tv_sec = 0;
1360        maxwait.tv_usec = 250000;
1361
1362        /* TODO: Support DUCK reporting */
1363
1364        /* Don't let anyone try to free our DAG memory hole! */
1365        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1366
1367        /* If the packet buffer is currently owned by libtrace, free it so
1368         * that we can set the packet to point into the DAG memory hole */
1369        if (packet->buf_control == TRACE_CTRL_PACKET) {
1370                free(packet->buffer);
1371                packet->buffer = 0;
1372        }
1373
1374        /* Configure DAG card stream polling */
1375        if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd, PERPKT_DATA(t)->stream,
1376                                                        sizeof(dag_record_t), &maxwait, &pollwait) < 0) {
1377                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1378                return -1;
1379        }
1380
1381        /* Grab an ERF record */
1382        do {
1383                numbytes = dag_pavailable(libtrace, t);
1384                if (numbytes < 0)
1385                        return numbytes;
1386                if (numbytes < dag_record_size) {
1387                        if (libtrace_halt)
1388                                return 0;
1389
1390                        /* Check message queue to see if we should abort early */
1391                        if (libtrace_message_queue_count(&t->messages) > 0)
1392                                return -2;
1393
1394                        /* Keep trying until we see a packet */
1395                        continue;
1396                }
1397
1398                erfptr = dag_pget_record(libtrace, t);
1399        } while (erfptr == NULL);
1400
1401        /* Prepare the libtrace packet */
1402        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1403                                                   flags))
1404                return -1;
1405
1406        PERPKT_DATA(t)->pkt_count++;
1407
1408        return packet->payload ? htons(erfptr->rlen) :
1409                erf_get_framing_length(packet);
1410}
1411
1412static int dag_ppause_input(libtrace_t *libtrace) {
1413        int i, tot = libtrace->perpkt_thread_count;
1414        struct dag_per_thread_t *t_data;
1415
1416        /* Stop and detach all the streams */
1417        printf("Stopping and detaching all streams\n");
1418        for (i = 0; i < tot; i++) {
1419                t_data = &FORMAT_DATA->per_thread[i];
1420
1421                if (dag_stop_stream(t_data->device->fd,
1422                                                        t_data->stream) < 0) {
1423                        trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
1424                                                  t_data->stream);
1425                        return -1;
1426                }
1427
1428                if (dag_detach_stream(t_data->device->fd,
1429                                                          t_data->stream) < 0) {
1430                        trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
1431                                                  t_data->stream);
1432                        return -1;
1433                }
1434        }
1435
1436        /* Free up the per_thread array */
1437        free(FORMAT_DATA->per_thread);
1438        FORMAT_DATA->per_thread = NULL;
1439
1440        return 0;
1441}
1442
1443static int dag_pconfig_input(libtrace_t *libtrace,
1444                                                         trace_parallel_option_t option, void *value) {
1445
1446        /* We don't support any of these! Normally you configure the DAG card
1447         * externally. */
1448        switch(option) {
1449        case TRACE_OPTION_SET_HASHER:
1450        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1451        case TRACE_OPTION_TRACETIME:
1452        case TRACE_OPTION_TICK_INTERVAL:
1453        case TRACE_OPTION_GET_CONFIG:
1454        case TRACE_OPTION_SET_CONFIG:
1455                return -1;
1456        }
1457        /* We don't provide a default option to ensure that future options will
1458         * generate a compiler warning. */
1459
1460        return -1;
1461}
1462
1463static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1464                                                                bool reader) {
1465        /* XXX: This function gets run sequentially for all
1466         * threads. Should investigate making it parallel as draining the
1467         * memory could be needlessly time consuming.
1468         */
1469        uint8_t *top, *bottom;
1470        uint8_t diff = 0; /* XXX: Investigate this type, as I would assume the value
1471                                           * could be larger than 255 */
1472        struct timeval zero, nopoll;
1473
1474        top = bottom = NULL;
1475
1476        /* Minimum delay is 10mS */
1477        zero.tv_sec = 0;
1478        zero.tv_usec = 10000;
1479        nopoll = zero;
1480
1481        if (reader) {
1482                if (t->type == THREAD_PERPKT) {
1483                        /* Pass the per thread data to the thread */
1484                        t->format_data = &FORMAT_DATA->per_thread[t->perpkt_num];
1485
1486                        /* Attach and start the DAG stream */
1487                        printf("t%u: starting and attaching stream #%u\n", t->perpkt_num,
1488                                   PERPKT_DATA(t)->stream);
1489                        if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
1490                                                                  PERPKT_DATA(t)->stream, 0, 0) < 0) {
1491                                trace_set_err(libtrace, errno, "can't attach DAG stream #%u",
1492                                                          PERPKT_DATA(t)->stream);
1493                                return -1;
1494                        }
1495                        if (dag_start_stream(PERPKT_DATA(t)->device->fd,
1496                                                                 PERPKT_DATA(t)->stream) < 0) {
1497                                trace_set_err(libtrace, errno, "can't start DAG stream #%u",
1498                                                          PERPKT_DATA(t)->stream);
1499                                return -1;
1500                        }
1501
1502                        /* Ensure that dag_advance_stream will return without blocking */
1503                        if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
1504                                                                   PERPKT_DATA(t)->stream, 0, &zero,
1505                                                                   &nopoll) < 0) {
1506                                trace_set_err(libtrace, errno, "dag_set_stream_poll failed!");
1507                                return -1;
1508                        }
1509
1510                        /* Clear all the data from the memory hole */
1511                        do {
1512                                top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
1513                                                                                 PERPKT_DATA(t)->stream,
1514                                                                                 &bottom);
1515
1516                                assert(top && bottom);
1517                                diff = top - bottom;
1518                                bottom -= diff;
1519                        } while (diff != 0);
1520
1521                        PERPKT_DATA(t)->top = NULL;
1522                        PERPKT_DATA(t)->bottom = NULL;
1523                        PERPKT_DATA(t)->pkt_count = 0;
1524                } else {
1525                        /* TODO: Figure out why we need this */
1526                        t->format_data = &FORMAT_DATA->per_thread[0];
1527                }
1528        }
1529
1530        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
1531
1532        return 0;
1533}
1534
1535static struct libtrace_format_t dag = {
1536        "dag",
1537        "$Id$",
1538        TRACE_FORMAT_ERF,
1539        dag_probe_filename,             /* probe filename */
1540        NULL,                           /* probe magic */
1541        dag_init_input,                 /* init_input */
1542        dag_config_input,               /* config_input */
1543        dag_start_input,                /* start_input */
1544        dag_pause_input,                /* pause_input */
1545        dag_init_output,                /* init_output */
1546        NULL,                           /* config_output */
1547        dag_start_output,               /* start_output */
1548        dag_fin_input,                  /* fin_input */
1549        dag_fin_output,                 /* fin_output */
1550        dag_read_packet,                /* read_packet */
1551        dag_prepare_packet,             /* prepare_packet */
1552        NULL,                           /* fin_packet */
1553        dag_write_packet,               /* write_packet */
1554        erf_get_link_type,              /* get_link_type */
1555        erf_get_direction,              /* get_direction */
1556        erf_set_direction,              /* set_direction */
1557        erf_get_erf_timestamp,          /* get_erf_timestamp */
1558        NULL,                           /* get_timeval */
1559        NULL,                           /* get_seconds */
1560        NULL,                           /* get_timespec */
1561        NULL,                           /* seek_erf */
1562        NULL,                           /* seek_timeval */
1563        NULL,                           /* seek_seconds */
1564        erf_get_capture_length,         /* get_capture_length */
1565        erf_get_wire_length,            /* get_wire_length */
1566        erf_get_framing_length,         /* get_framing_length */
1567        erf_set_capture_length,         /* set_capture_length */
1568        NULL,                           /* get_received_packets */
1569        NULL,                           /* get_filtered_packets */
1570        dag_get_dropped_packets,        /* get_dropped_packets */
1571        NULL,                           /* get_captured_packets */
1572        NULL,                           /* get_fd */
1573        trace_event_dag,                /* trace_event */
1574        dag_help,                       /* help */
1575        NULL,                            /* next pointer */
1576                {true, 0}, /* live packet capture, thread limit TBD */
1577                dag_pstart_input,
1578                dag_pread_packet,
1579                dag_ppause_input,
1580                NULL,
1581                dag_pconfig_input,
1582                dag_pregister_thread,
1583                NULL
1584};
1585
1586void dag_constructor(void) {
1587        register_format(&dag);
1588}
Note: See TracBrowser for help on using the repository browser.