source: lib/format_dag25.c @ 0a1d2d0

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 0a1d2d0 was c66a465, checked in by Dan <dan@…>, 7 years ago

Added support for drop count in DAG format.

This does not support DSM, as there are some issues with the DAG card for that.
Someone decided that when ERF packets are DSM coloured that the drop counter should
disappear..!

  • Property mode set to 100644
File size: 47.5 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84#define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
85
86#define FORMAT_DATA DATA(libtrace)
87#define FORMAT_DATA_OUT DATA_OUT(libtrace)
88
89#define DUCK FORMAT_DATA->duck
90static struct libtrace_format_t dag;
91
92/* A DAG device - a DAG device can support multiple streams (and therefore
93 * multiple input traces) so each trace needs to refer to a device */
94struct dag_dev_t {
95        char * dev_name;                /* Device name */
96        int fd;                         /* File descriptor */
97        uint16_t ref_count;             /* Number of input / output traces
98                                           that are using this device */
99        struct dag_dev_t *prev;         /* Pointer to the previous device in
100                                           the device list */
101        struct dag_dev_t *next;         /* Pointer to the next device in the
102                                           device list */
103};
104
105/* "Global" data that is stored for each DAG output trace */
106struct dag_format_data_out_t {
107        /* The DAG device being used for writing */
108        struct dag_dev_t *device;
109        /* The DAG stream that is being written on */
110        unsigned int dagstream;
111        /* Boolean flag indicating whether the stream is currently attached */
112        int stream_attached;
113        /* The amount of data waiting to be transmitted, in bytes */
114        uint64_t waiting;
115        /* A buffer to hold the data to be transmittted */
116        uint8_t *txbuffer;
117};
118
119/* Data that is stored for each libtrace_thread_t */
120struct dag_per_thread_t {
121        struct dag_dev_t *device; /* DAG device */
122        uint16_t stream; /* Stream number */
123        uint8_t *top; /* Pointer to the last unread byte in the DAG memory */
124        uint8_t *bottom; /* Pointer to the first unread byte in the DAG memory */
125        uint32_t processed; /* Amount of data processed from the bottom pointer */
126        uint64_t pkt_count; /* Number of packets seen by the thread */
127        uint64_t drops;
128};
129
130/* "Global" data that is stored for each DAG input trace */
131struct dag_format_data_t {
132
133        /* Data required for regular DUCK reporting */
134        struct {
135                /* Timestamp of the last DUCK report */
136                uint32_t last_duck;
137                /* The number of seconds between each DUCK report */
138                uint32_t duck_freq;
139                /* Timestamp of the last packet read from the DAG card */
140                uint32_t last_pkt;
141                /* Dummy trace to ensure DUCK packets are dealt with using the
142                 * DUCK format functions */
143                libtrace_t *dummy_duck;
144        } duck;
145
146        /* The DAG device that we are reading from */
147        struct dag_dev_t *device;
148        /* The DAG stream that we are reading from */
149        unsigned int dagstream;
150        /* Boolean flag indicating whether the stream is currently attached */
151        int stream_attached;
152        /* Pointer to the first unread byte in the DAG memory hole */
153        uint8_t *bottom;
154        /* Pointer to the last unread byte in the DAG memory hole */
155        uint8_t *top;
156        /* The amount of data processed thus far from the bottom pointer */
157        uint32_t processed;
158        /* The number of packets that have been dropped */
159        uint64_t drops;
160        /* When running in parallel mode this is malloc'd with an array of thread
161         * structures. Most of the stuff above doesn't get used in parallel mode. */
162        struct dag_per_thread_t *per_thread;
163};
164
165/* To be thread-safe, we're going to need a mutex for operating on the list
166 * of DAG devices */
167pthread_mutex_t open_dag_mutex;
168
169/* The list of DAG devices that have been opened by libtrace.
170 *
171 * We can only open each DAG device once, but we might want to read from
172 * multiple streams. Therefore, we need to maintain a list of devices that we
173 * have opened (with ref counts!) so that we don't try to open a device too
174 * many times or close a device that we're still using */
175struct dag_dev_t *open_dags = NULL;
176
177/* Returns the amount of padding between the ERF header and the start of the
178 * captured packet data */
179static int dag_get_padding(const libtrace_packet_t *packet)
180{
181        /* ERF Ethernet records have a 2 byte padding before the packet itself
182         * so that the IP header is aligned on a 32 bit boundary.
183         */
184        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
185                dag_record_t *erfptr = (dag_record_t *)packet->header;
186                switch(erfptr->type) {
187                        case TYPE_ETH:
188                        case TYPE_DSM_COLOR_ETH:
189                                return 2;
190                        default:                return 0;
191                }
192        }
193        else {
194                switch(trace_get_link_type(packet)) {
195                        case TRACE_TYPE_ETH:    return 2;
196                        default:                return 0;
197                }
198        }
199}
200
201/* Attempts to determine if the given filename refers to a DAG device */
202static int dag_probe_filename(const char *filename)
203{
204        struct stat statbuf;
205        /* Can we stat the file? */
206        if (stat(filename, &statbuf) != 0) {
207                return 0;
208        }
209        /* Is it a character device? */
210        if (!S_ISCHR(statbuf.st_mode)) {
211                return 0;
212        }
213        /* Yeah, it's probably us. */
214        return 1;
215}
216
217/* Initialises the DAG output data structure */
218static void dag_init_format_out_data(libtrace_out_t *libtrace) {
219        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
220        // no DUCK on output
221        FORMAT_DATA_OUT->stream_attached = 0;
222        FORMAT_DATA_OUT->device = NULL;
223        FORMAT_DATA_OUT->dagstream = 0;
224        FORMAT_DATA_OUT->waiting = 0;
225
226}
227
228/* Initialises the DAG input data structure */
229static void dag_init_format_data(libtrace_t *libtrace) {
230        libtrace->format_data = (struct dag_format_data_t *)
231                malloc(sizeof(struct dag_format_data_t));
232        DUCK.last_duck = 0;
233        DUCK.duck_freq = 0;
234        DUCK.last_pkt = 0;
235        DUCK.dummy_duck = NULL;
236        FORMAT_DATA->stream_attached = 0;
237        FORMAT_DATA->drops = 0;
238        FORMAT_DATA->device = NULL;
239        FORMAT_DATA->dagstream = 0;
240        FORMAT_DATA->processed = 0;
241        FORMAT_DATA->bottom = NULL;
242        FORMAT_DATA->top = NULL;
243}
244
245/* Determines if there is already an entry for the given DAG device in the
246 * device list and increments the reference count for that device, if found.
247 *
248 * NOTE: This function assumes the open_dag_mutex is held by the caller */
249static struct dag_dev_t *dag_find_open_device(char *dev_name) {
250        struct dag_dev_t *dag_dev;
251
252        dag_dev = open_dags;
253
254        /* XXX: Not exactly zippy, but how often are we going to be dealing
255         * with multiple dag cards? */
256        while (dag_dev != NULL) {
257                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
258                        dag_dev->ref_count ++;
259                        return dag_dev;
260
261                }
262                dag_dev = dag_dev->next;
263        }
264        return NULL;
265
266
267}
268
269/* Closes a DAG device and removes it from the device list.
270 *
271 * Attempting to close a DAG device that has a non-zero reference count will
272 * cause an assertion failure!
273 *
274 * NOTE: This function assumes the open_dag_mutex is held by the caller */
275static void dag_close_device(struct dag_dev_t *dev) {
276        /* Need to remove from the device list */
277
278        assert(dev->ref_count == 0);
279
280        if (dev->prev == NULL) {
281                open_dags = dev->next;
282                if (dev->next)
283                        dev->next->prev = NULL;
284        } else {
285                dev->prev->next = dev->next;
286                if (dev->next)
287                        dev->next->prev = dev->prev;
288        }
289
290        dag_close(dev->fd);
291        if (dev->dev_name)
292        free(dev->dev_name);
293        free(dev);
294}
295
296
297/* Opens a new DAG device for writing and adds it to the DAG device list
298 *
299 * NOTE: this function should only be called when opening a DAG device for
300 * writing - there is little practical difference between this and the
301 * function below that covers the reading case, but we need the output trace
302 * object to report errors properly so the two functions take slightly
303 * different arguments. This is really lame and there should be a much better
304 * way of doing this.
305 *
306 * NOTE: This function assumes the open_dag_mutex is held by the caller
307 */
308static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
309        struct stat buf;
310        int fd;
311        struct dag_dev_t *new_dev;
312
313        /* Make sure the device exists */
314        if (stat(dev_name, &buf) == -1) {
315                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
316                return NULL;
317}
318
319        /* Make sure it is the appropriate type of device */
320        if (S_ISCHR(buf.st_mode)) {
321                /* Try opening the DAG device */
322                if((fd = dag_open(dev_name)) < 0) {
323                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
324                                        dev_name);
325                        return NULL;
326                }
327        } else {
328                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
329                                dev_name);
330                return NULL;
331        }
332
333        /* Add the device to our device list - it is just a doubly linked
334         * list with no inherent ordering; just tack the new one on the front
335         */
336        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
337        new_dev->fd = fd;
338        new_dev->dev_name = dev_name;
339        new_dev->ref_count = 1;
340
341        new_dev->prev = NULL;
342        new_dev->next = open_dags;
343        if (open_dags)
344                open_dags->prev = new_dev;
345
346        open_dags = new_dev;
347
348        return new_dev;
349}
350
351/* Opens a new DAG device for reading and adds it to the DAG device list
352 *
353 * NOTE: this function should only be called when opening a DAG device for
354 * reading - there is little practical difference between this and the
355 * function above that covers the writing case, but we need the input trace
356 * object to report errors properly so the two functions take slightly
357 * different arguments. This is really lame and there should be a much better
358 * way of doing this.
359 *
360 * NOTE: This function assumes the open_dag_mutex is held by the caller */
361static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
362        struct stat buf;
363        int fd;
364        struct dag_dev_t *new_dev;
365
366        /* Make sure the device exists */
367        if (stat(dev_name, &buf) == -1) {
368                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
369                return NULL;
370        }
371
372        /* Make sure it is the appropriate type of device */
373        if (S_ISCHR(buf.st_mode)) {
374                /* Try opening the DAG device */
375                if((fd = dag_open(dev_name)) < 0) {
376                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
377                                        dev_name);
378                        return NULL;
379                }
380        } else {
381                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
382                                dev_name);
383                return NULL;
384        }
385
386        /* Add the device to our device list - it is just a doubly linked
387         * list with no inherent ordering; just tack the new one on the front
388         */
389        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
390        new_dev->fd = fd;
391        new_dev->dev_name = dev_name;
392        new_dev->ref_count = 1;
393
394        new_dev->prev = NULL;
395        new_dev->next = open_dags;
396        if (open_dags)
397                open_dags->prev = new_dev;
398
399        open_dags = new_dev;
400
401        return new_dev;
402}
403
404/* Creates and initialises a DAG output trace */
405static int dag_init_output(libtrace_out_t *libtrace) {
406        char *dag_dev_name = NULL;
407        char *scan = NULL;
408        struct dag_dev_t *dag_device = NULL;
409        int stream = 1;
410       
411        /* XXX I don't know if this is important or not, but this function
412         * isn't present in all of the driver releases that this code is
413         * supposed to support! */
414        /*
415        unsigned long wake_time;
416        dagutil_sleep_get_wake_time(&wake_time,0);
417        */
418
419        dag_init_format_out_data(libtrace);
420        /* Grab the mutex while we're likely to be messing with the device
421         * list */
422        pthread_mutex_lock(&open_dag_mutex);
423       
424        /* Specific streams are signified using a comma in the libtrace URI,
425         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
426         *
427         * If no stream is specified, we will write using stream 1 */
428        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
429                dag_dev_name = strdup(libtrace->uridata);
430        } else {
431                dag_dev_name = (char *)strndup(libtrace->uridata,
432                                (size_t)(scan - libtrace->uridata));
433                stream = atoi(++scan);
434        }
435        FORMAT_DATA_OUT->dagstream = stream;
436
437        /* See if our DAG device is already open */
438        dag_device = dag_find_open_device(dag_dev_name);
439
440        if (dag_device == NULL) {
441                /* Device not yet opened - open it ourselves */
442                dag_device = dag_open_output_device(libtrace, dag_dev_name);
443        } else {
444                /* Otherwise, just use the existing one */
445                free(dag_dev_name);
446                dag_dev_name = NULL;
447        }
448
449        /* Make sure we have successfully opened a DAG device */
450        if (dag_device == NULL) {
451                if (dag_dev_name) {
452                        free(dag_dev_name);
453                }
454                pthread_mutex_unlock(&open_dag_mutex);
455                return -1;
456        }
457
458        FORMAT_DATA_OUT->device = dag_device;
459        pthread_mutex_unlock(&open_dag_mutex);
460        return 0;
461}
462
463/* Creates and initialises a DAG input trace */
464static int dag_init_input(libtrace_t *libtrace) {
465        char *dag_dev_name = NULL;
466        char *scan = NULL;
467        int stream = 0, thread_count = 1;
468        struct dag_dev_t *dag_device = NULL;
469
470        dag_init_format_data(libtrace);
471        /* Grab the mutex while we're likely to be messing with the device
472         * list */
473        pthread_mutex_lock(&open_dag_mutex);
474
475
476        /* DAG cards support multiple streams. In a single threaded capture,
477         * these are specified using a comma in the libtrace URI,
478         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
479         *
480         * If no stream is specified, we will read from stream 0 with one thread
481         */
482        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
483                dag_dev_name = strdup(libtrace->uridata);
484               
485        } else {
486                dag_dev_name = (char *)strndup(libtrace->uridata,
487                                                                           (size_t)(scan - libtrace->uridata));
488                stream = atoi(++scan);
489        }
490
491        FORMAT_DATA->dagstream = stream;
492
493        /* See if our DAG device is already open */
494        dag_device = dag_find_open_device(dag_dev_name);
495
496        if (dag_device == NULL) {
497                /* Device not yet opened - open it ourselves */
498                dag_device = dag_open_device(libtrace, dag_dev_name);
499        } else {
500                /* Otherwise, just use the existing one */
501                free(dag_dev_name);
502                dag_dev_name = NULL;
503        }
504
505        /* Make sure we have successfully opened a DAG device */
506        if (dag_device == NULL) {
507                if (dag_dev_name)
508                        free(dag_dev_name);
509                dag_dev_name = NULL;
510                pthread_mutex_unlock(&open_dag_mutex);
511                return -1;
512        }
513
514        FORMAT_DATA->device = dag_device;
515
516        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
517        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
518 
519        *  The symptom of the port being disabled is that libtrace will appear to hang.
520        */
521        /* Check kBooleanAttributeFault is false */
522        /* Check kBooleanAttributeLocalFault is false */
523        /* Check kBooleanAttributeLock is true ? */
524        /* Check kBooleanAttributePeerLink ? */
525
526        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
527        /* Set kUint32AttributeSnapLength to the snaplength */
528
529        pthread_mutex_unlock(&open_dag_mutex);
530        return 0;
531}
532
533/* Configures a DAG input trace */
534static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
535                                void *data) {
536        char conf_str[4096];
537        switch(option) {
538                case TRACE_OPTION_META_FREQ:
539                        /* This option is used to specify the frequency of DUCK
540                         * updates */
541                        DUCK.duck_freq = *(int *)data;
542                        return 0;
543                case TRACE_OPTION_SNAPLEN:
544                        /* Tell the card our new snap length */
545                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
546                        if (dag_configure(FORMAT_DATA->device->fd,
547                                                conf_str) != 0) {
548                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
549                                return -1;
550                        }
551                        return 0;
552                case TRACE_OPTION_PROMISC:
553                        /* DAG already operates in a promisc fashion */
554                        return -1;
555                case TRACE_OPTION_FILTER:
556                        /* We don't yet support pushing filters into DAG
557                         * cards */
558                        return -1;
559                case TRACE_OPTION_EVENT_REALTIME:
560                        /* Live capture is always going to be realtime */
561                        return -1;
562        }
563        return -1;
564}
565
566/* Starts a DAG output trace */
567static int dag_start_output(libtrace_out_t *libtrace) {
568        struct timeval zero, nopoll;
569
570        zero.tv_sec = 0;
571        zero.tv_usec = 0;
572        nopoll = zero;
573
574        /* Attach and start the DAG stream */
575
576        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
577                        FORMAT_DATA_OUT->dagstream, 0, 1048576) < 0) {
578                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
579                return -1;
580        }
581
582        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
583                        FORMAT_DATA_OUT->dagstream) < 0) {
584                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
585                return -1;
586        }
587        FORMAT_DATA_OUT->stream_attached = 1;
588
589        /* We don't want the dag card to do any sleeping */
590
591        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
592                        FORMAT_DATA_OUT->dagstream, 0, &zero,
593                        &nopoll);
594
595        return 0;
596}
597
598/* Starts a DAG input trace */
599static int dag_start_input(libtrace_t *libtrace) {
600        struct timeval zero, nopoll;
601        uint8_t *top, *bottom;
602        uint8_t diff = 0;
603        top = bottom = NULL;
604
605        zero.tv_sec = 0;
606        zero.tv_usec = 10000;
607        nopoll = zero;
608
609        /* Attach and start the DAG stream */
610        if (dag_attach_stream(FORMAT_DATA->device->fd,
611                                FORMAT_DATA->dagstream, 0, 0) < 0) {
612                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
613                return -1;
614        }
615
616        if (dag_start_stream(FORMAT_DATA->device->fd,
617                                FORMAT_DATA->dagstream) < 0) {
618                trace_set_err(libtrace, errno, "Cannot start DAG stream");
619                return -1;
620        }
621        FORMAT_DATA->stream_attached = 1;
622       
623        /* We don't want the dag card to do any sleeping */
624        dag_set_stream_poll(FORMAT_DATA->device->fd,
625                                FORMAT_DATA->dagstream, 0, &zero,
626                                &nopoll);
627
628        /* Should probably flush the memory hole now */
629        do {
630                top = dag_advance_stream(FORMAT_DATA->device->fd,
631                                        FORMAT_DATA->dagstream,
632                                        &bottom);
633                assert(top && bottom);
634                diff = top - bottom;
635                bottom -= diff;
636        } while (diff != 0);
637        FORMAT_DATA->top = NULL;
638        FORMAT_DATA->bottom = NULL;
639        FORMAT_DATA->processed = 0;
640        FORMAT_DATA->drops = 0;
641
642        return 0;
643}
644
645/* Pauses a DAG output trace */
646static int dag_pause_output(libtrace_out_t *libtrace) {
647
648        /* Stop and detach the stream */
649        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
650                        FORMAT_DATA_OUT->dagstream) < 0) {
651                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
652                return -1;
653        }
654        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
655                        FORMAT_DATA_OUT->dagstream) < 0) {
656                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
657                return -1;
658        }
659        FORMAT_DATA_OUT->stream_attached = 0;
660        return 0;
661}
662
663/* Pauses a DAG input trace */
664static int dag_pause_input(libtrace_t *libtrace) {
665
666        /* Stop and detach the stream */
667        if (dag_stop_stream(FORMAT_DATA->device->fd,
668                                FORMAT_DATA->dagstream) < 0) {
669                trace_set_err(libtrace, errno, "Could not stop DAG stream");
670                return -1;
671        }
672        if (dag_detach_stream(FORMAT_DATA->device->fd,
673                                FORMAT_DATA->dagstream) < 0) {
674                trace_set_err(libtrace, errno, "Could not detach DAG stream");
675                return -1;
676        }
677        FORMAT_DATA->stream_attached = 0;
678        return 0;
679}
680
681/* Closes a DAG input trace */
682static int dag_fin_input(libtrace_t *libtrace) {
683        /* Need the lock, since we're going to be handling the device list */
684        pthread_mutex_lock(&open_dag_mutex);
685       
686        /* Detach the stream if we are not paused */
687        if (FORMAT_DATA->stream_attached)
688                dag_pause_input(libtrace);
689        FORMAT_DATA->device->ref_count --;
690
691        /* Close the DAG device if there are no more references to it */
692        if (FORMAT_DATA->device->ref_count == 0)
693                dag_close_device(FORMAT_DATA->device);
694        if (DUCK.dummy_duck)
695                trace_destroy_dead(DUCK.dummy_duck);
696        free(libtrace->format_data);
697        pthread_mutex_unlock(&open_dag_mutex);
698        return 0; /* success */
699}
700
701/* Closes a DAG output trace */
702static int dag_fin_output(libtrace_out_t *libtrace) {
703       
704        /* Commit any outstanding traffic in the txbuffer */
705        if (FORMAT_DATA_OUT->waiting) {
706                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
707                                FORMAT_DATA_OUT->waiting );
708        }
709
710        /* Wait until the buffer is nearly clear before exiting the program,
711         * as we will lose packets otherwise */
712        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
713                        FORMAT_DATA_OUT->dagstream,
714                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
715                                        FORMAT_DATA_OUT->dagstream) - 8
716                        );
717
718        /* Need the lock, since we're going to be handling the device list */
719        pthread_mutex_lock(&open_dag_mutex);
720
721        /* Detach the stream if we are not paused */
722        if (FORMAT_DATA_OUT->stream_attached)
723                dag_pause_output(libtrace);
724        FORMAT_DATA_OUT->device->ref_count --;
725
726        /* Close the DAG device if there are no more references to it */
727        if (FORMAT_DATA_OUT->device->ref_count == 0)
728                dag_close_device(FORMAT_DATA_OUT->device);
729        free(libtrace->format_data);
730        pthread_mutex_unlock(&open_dag_mutex);
731        return 0; /* success */
732}
733
734/* Extracts DUCK information from the DAG card and produces a DUCK packet */
735static int dag_get_duckinfo(libtrace_t *libtrace,
736                                libtrace_packet_t *packet) {
737
738        /* Allocate memory for the DUCK data */
739        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
740                        !packet->buffer) {
741                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
742                packet->buf_control = TRACE_CTRL_PACKET;
743                if (!packet->buffer) {
744                        trace_set_err(libtrace, errno,
745                                        "Cannot allocate packet buffer");
746                        return -1;
747                }
748        }
749
750        /* DUCK doesn't have a format header */
751        packet->header = 0;
752        packet->payload = packet->buffer;
753
754        /* No need to check if we can get DUCK or not - we're modern
755         * enough so just grab the DUCK info */
756        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
757                                        (duckinf_t *)packet->payload) < 0)) {
758                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
759                return -1;
760        }
761
762        packet->type = TRACE_RT_DUCK_2_5;
763
764        /* Set the packet's tracce to point at a DUCK trace, so that the
765         * DUCK format functions will be called on the packet rather than the
766         * DAG ones */
767        if (!DUCK.dummy_duck)
768                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
769        packet->trace = DUCK.dummy_duck;
770        return sizeof(duckinf_t);
771}
772
773/* Determines the amount of data available to read from the DAG card */
774static int dag_available(libtrace_t *libtrace) {
775        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
776
777        /* If we've processed more than 4MB of data since we last called
778         * dag_advance_stream, then we should call it again to allow the
779         * space occupied by that 4MB to be released */
780        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
781                return diff;
782       
783        /* Update the top and bottom pointers */
784        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
785                        FORMAT_DATA->dagstream,
786                        &(FORMAT_DATA->bottom));
787       
788        if (FORMAT_DATA->top == NULL) {
789                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
790                return -1;
791        }
792        FORMAT_DATA->processed = 0;
793        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
794        return diff;
795}
796
797/* Returns a pointer to the start of the next complete ERF record */
798static dag_record_t *dag_get_record(libtrace_t *libtrace) {
799        dag_record_t *erfptr = NULL;
800        uint16_t size;
801        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
802        if (!erfptr)
803                return NULL;
804        size = ntohs(erfptr->rlen);
805        assert( size >= dag_record_size );
806        /* Make certain we have the full packet available */
807        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
808                return NULL;
809        FORMAT_DATA->bottom += size;
810        FORMAT_DATA->processed += size;
811        return erfptr;
812}
813
814/* Converts a buffer containing a recently read DAG packet record into a
815 * libtrace packet */
816static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
817                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
818
819        dag_record_t *erfptr;
820        libtrace_thread_t *t;
821       
822        /* If the packet previously owned a buffer that is not the buffer
823         * that contains the new packet data, we're going to need to free the
824         * old one to avoid memory leaks */
825        if (packet->buffer != buffer &&
826                        packet->buf_control == TRACE_CTRL_PACKET) {
827                free(packet->buffer);
828        }
829
830        /* Set the buffer owner appropriately */
831        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
832                packet->buf_control = TRACE_CTRL_PACKET;
833        } else
834                packet->buf_control = TRACE_CTRL_EXTERNAL;
835
836        /* Update the packet pointers and type appropriately */
837        erfptr = (dag_record_t *)buffer;
838        packet->buffer = erfptr;
839        packet->header = erfptr;
840        packet->type = rt_type;
841
842        if (erfptr->flags.rxerror == 1) {
843                /* rxerror means the payload is corrupt - drop the payload
844                 * by tweaking rlen */
845                packet->payload = NULL;
846                erfptr->rlen = htons(erf_get_framing_length(packet));
847        } else {
848                packet->payload = (char*)packet->buffer
849                        + erf_get_framing_length(packet);
850        }
851
852        if (libtrace->format_data == NULL) {
853                dag_init_format_data(libtrace);
854        }
855
856        /* Update the dropped packets counter */
857        /* No loss counter for DSM coloured records - have to use
858         * some other API */
859        /* Adding multithread support for this isn't actually that useful for the
860         * DAG7.5G2, as there's no way to use multiple receive streams without DSM */
861        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
862                /* TODO */
863        } else {
864                /* Use the ERF loss counter */
865                if (DATA(libtrace)->per_thread) {
866                        t = get_thread_table(libtrace);
867                        PERPKT_DATA(t)->drops += ntohs(erfptr->lctr);
868                } else {
869                        printf("DROP!\n");
870                        DATA(libtrace)->drops += ntohs(erfptr->lctr);
871                }
872        }
873
874
875        return 0;
876}
877
878/*
879 * dag_write_packet() at this stage attempts to improve tx performance
880 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
881 * has been met. I observed approximately 270% performance increase
882 * through this relatively naive tweak. No optimisation of buffer sizes
883 * was attempted.
884 */
885
886/* Pushes an ERF record onto the transmit stream */
887static int dag_dump_packet(libtrace_out_t *libtrace,
888                dag_record_t *erfptr, unsigned int pad, void *buffer) {
889        int size;
890
891        /*
892         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
893         * requested any space yet, and request some, storing the pointer at
894         * FORMAT_DATA_OUT->txbuffer.
895         *
896         * The amount to request is slightly magical at the moment - it's
897         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
898         * the buffer and handle overruns.
899         */
900        if (FORMAT_DATA_OUT->waiting == 0) {
901                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
902                                FORMAT_DATA_OUT->dagstream, 16908288);
903        }
904
905        /*
906         * Copy the header separately to the body, as we can't guarantee they
907         * are in contiguous memory
908         */
909        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
910        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
911
912
913
914        /*
915         * Copy our incoming packet into the outgoing buffer, and increment
916         * our waiting count
917         */
918        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
919        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
920        FORMAT_DATA_OUT->waiting += size;
921
922        /*
923         * If our output buffer has more than 16 Mebibytes in it, commit those
924         * bytes and reset the waiting count to 0.
925         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
926         * case there is still data in the buffer at program exit.
927         */
928
929        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
930                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
931                        FORMAT_DATA_OUT->waiting );
932                FORMAT_DATA_OUT->waiting = 0;
933        }
934
935        return size + pad + dag_record_size;
936
937}
938
939/* Attempts to determine a suitable ERF type for a given packet. Returns true
940 * if one is found, false otherwise */
941static bool find_compatible_linktype(libtrace_out_t *libtrace,
942                                libtrace_packet_t *packet, char *type)
943{
944         // Keep trying to simplify the packet until we can find
945         //something we can do with it
946
947        do {
948                *type=libtrace_to_erf_type(trace_get_link_type(packet));
949
950                // Success
951                if (*type != (char)-1)
952                        return true;
953
954                if (!demote_packet(packet)) {
955                        trace_set_err_out(libtrace,
956                                        TRACE_ERR_NO_CONVERSION,
957                                        "No erf type for packet (%i)",
958                                        trace_get_link_type(packet));
959                        return false;
960                }
961
962        } while(1);
963
964        return true;
965}
966
967/* Writes a packet to the provided DAG output trace */
968static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
969        /*
970         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
971         * sucks, sorry about that.
972         */
973        unsigned int pad = 0;
974        int numbytes;
975        void *payload = packet->payload;
976        dag_record_t *header = (dag_record_t *)packet->header;
977        char erf_type = 0;
978
979        if(!packet->header) {
980                /* No header, probably an RT packet. Lifted from
981                 * erf_write_packet(). */
982                return -1;
983        }
984
985        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
986                return 0;
987
988        pad = dag_get_padding(packet);
989
990        /*
991         * If the payload is null, adjust the rlen. Discussion of this is
992         * attached to erf_write_packet()
993         */
994        if (payload == NULL) {
995                header->rlen = htons(dag_record_size + pad);
996        }
997
998        if (packet->type == TRACE_RT_DATA_ERF) {
999                numbytes = dag_dump_packet(libtrace,
1000                                header,
1001                                pad,
1002                                payload
1003                                );
1004
1005        } else {
1006                /* Build up a new packet header from the existing header */
1007
1008                /* Simplify the packet first - if we can't do this, break
1009                 * early */
1010                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1011                        return -1;
1012
1013                dag_record_t erfhdr;
1014
1015                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1016                payload=packet->payload;
1017                pad = dag_get_padding(packet);
1018
1019                /* Flags. Can't do this */
1020                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1021                if (trace_get_direction(packet)!=(int)~0U)
1022                        erfhdr.flags.iface = trace_get_direction(packet);
1023
1024                erfhdr.type = erf_type;
1025
1026                /* Packet length (rlen includes format overhead) */
1027                assert(trace_get_capture_length(packet)>0
1028                                && trace_get_capture_length(packet)<=65536);
1029                assert(erf_get_framing_length(packet)>0
1030                                && trace_get_framing_length(packet)<=65536);
1031                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1032                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1033
1034                erfhdr.rlen = htons(trace_get_capture_length(packet)
1035                        + erf_get_framing_length(packet));
1036
1037
1038                /* Loss counter. Can't do this */
1039                erfhdr.lctr = 0;
1040                /* Wire length, does not include padding! */
1041                erfhdr.wlen = htons(trace_get_wire_length(packet));
1042
1043                /* Write it out */
1044                numbytes = dag_dump_packet(libtrace,
1045                                &erfhdr,
1046                                pad,
1047                                payload);
1048
1049        }
1050
1051        return numbytes;
1052}
1053
1054/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1055 *
1056 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1057 */
1058static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1059        int size = 0;
1060        struct timeval tv;
1061        dag_record_t *erfptr = NULL;
1062        int numbytes = 0;
1063        uint32_t flags = 0;
1064        struct timeval maxwait;
1065        struct timeval pollwait;
1066
1067        pollwait.tv_sec = 0;
1068        pollwait.tv_usec = 10000;
1069        maxwait.tv_sec = 0;
1070        maxwait.tv_usec = 250000;
1071
1072        /* Check if we're due for a DUCK report */
1073        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
1074                        DUCK.duck_freq != 0) {
1075                size = dag_get_duckinfo(libtrace, packet);
1076                DUCK.last_duck = DUCK.last_pkt;
1077                if (size != 0) {
1078                        return size;
1079                }
1080                /* No DUCK support, so don't waste our time anymore */
1081                DUCK.duck_freq = 0;
1082        }
1083
1084        /* Don't let anyone try to free our DAG memory hole! */
1085        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1086
1087        /* If the packet buffer is currently owned by libtrace, free it so
1088         * that we can set the packet to point into the DAG memory hole */
1089        if (packet->buf_control == TRACE_CTRL_PACKET) {
1090                free(packet->buffer);
1091                packet->buffer = 0;
1092        }
1093       
1094        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1095                        FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 
1096                        &pollwait) == -1)
1097        {
1098                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1099                return -1;
1100        }
1101
1102
1103        /* Grab a full ERF record */
1104        do {
1105                numbytes = dag_available(libtrace);
1106                if (numbytes < 0)
1107                        return numbytes;
1108                if (numbytes < dag_record_size) {
1109                        if (libtrace_halt)
1110                                return 0;
1111                        /* Block until we see a packet */
1112                        continue;
1113                }
1114                erfptr = dag_get_record(libtrace);
1115        } while (erfptr == NULL);
1116
1117        /* Prepare the libtrace packet */
1118        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1119                                flags))
1120                return -1;
1121
1122        /* Update the DUCK timer */
1123        tv = trace_get_timeval(packet);
1124        DUCK.last_pkt = tv.tv_sec;
1125
1126        return packet->payload ? htons(erfptr->rlen) :
1127                                erf_get_framing_length(packet);
1128}
1129
1130/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1131 * packet is available, we will return a packet event. Otherwise we will
1132 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1133 */
1134static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1135                                        libtrace_packet_t *packet) {
1136        libtrace_eventobj_t event = {0,0,0.0,0};
1137        dag_record_t *erfptr = NULL;
1138        int numbytes;
1139        uint32_t flags = 0;
1140        struct timeval minwait;
1141       
1142        minwait.tv_sec = 0;
1143        minwait.tv_usec = 10000;
1144       
1145        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1146                        FORMAT_DATA->dagstream, 0, &minwait, 
1147                        &minwait) == -1)
1148        {
1149                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1150                event.type = TRACE_EVENT_TERMINATE;
1151                return event;
1152        }
1153
1154        do {
1155                erfptr = NULL;
1156                numbytes = 0;
1157       
1158                /* Need to call dag_available so that the top pointer will get
1159                 * updated, otherwise we'll never see any data! */
1160                numbytes = dag_available(libtrace);
1161
1162                /* May as well not bother calling dag_get_record if
1163                 * dag_available suggests that there's no data */
1164                if (numbytes != 0)
1165                        erfptr = dag_get_record(libtrace);
1166                if (erfptr == NULL) {
1167                        /* No packet available - sleep for a very short time */
1168                        if (libtrace_halt) {
1169                                event.type = TRACE_EVENT_TERMINATE;
1170                        } else {                       
1171                                event.type = TRACE_EVENT_SLEEP;
1172                                event.seconds = 0.0001;
1173                        }
1174                        break;
1175                }
1176                if (dag_prepare_packet(libtrace, packet, erfptr, 
1177                                        TRACE_RT_DATA_ERF, flags)) {
1178                        event.type = TRACE_EVENT_TERMINATE;
1179                        break;
1180                }
1181
1182
1183                event.size = trace_get_capture_length(packet) + 
1184                                trace_get_framing_length(packet);
1185               
1186                /* XXX trace_read_packet() normally applies the following
1187                 * config options for us, but this function is called via
1188                 * trace_event() so we have to do it ourselves */
1189
1190                if (libtrace->filter) {
1191                        int filtret = trace_apply_filter(libtrace->filter, 
1192                                        packet);
1193                        if (filtret == -1) {
1194                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1195                                                "Bad BPF Filter");
1196                                event.type = TRACE_EVENT_TERMINATE;
1197                                break;
1198                        }
1199
1200                        if (filtret == 0) {
1201                                /* This packet isn't useful so we want to
1202                                 * immediately see if there is another suitable
1203                                 * one - we definitely DO NOT want to return
1204                                 * a sleep event in this case, like we used to
1205                                 * do! */
1206                                libtrace->filtered_packets ++;
1207                                trace_clear_cache(packet);
1208                                continue;
1209                        }
1210                               
1211                        event.type = TRACE_EVENT_PACKET;
1212                } else {
1213                        event.type = TRACE_EVENT_PACKET;
1214                }
1215
1216                if (libtrace->snaplen > 0) {
1217                        trace_set_capture_length(packet, libtrace->snaplen);
1218                }
1219                libtrace->accepted_packets ++;
1220                break;
1221        } while (1);
1222
1223        return event;
1224}
1225
1226/* Gets the number of dropped packets */
1227static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1228        uint64_t sum = 0;
1229        int i, tot;
1230
1231        if (trace->format_data == NULL)
1232                return (uint64_t)-1;
1233
1234        if (DATA(trace)->per_thread) {
1235                tot = trace->perpkt_thread_count;
1236
1237                for (i = 0; i < tot; i++) {
1238                        printf("t%d: drops %" PRIu64 "\n",
1239                                   DATA(trace)->per_thread[i].drops);
1240                        sum += DATA(trace)->per_thread[i].drops;
1241                }
1242        }
1243
1244        sum += DATA(trace)->drops;
1245
1246        return sum;
1247}
1248
1249/* Prints some semi-useful help text about the DAG format module */
1250static void dag_help(void) {
1251        printf("dag format module: $Revision: 1755 $\n");
1252        printf("Supported input URIs:\n");
1253        printf("\tdag:/dev/dagn\n");
1254        printf("\n");
1255        printf("\te.g.: dag:/dev/dag0\n");
1256        printf("\n");
1257        printf("Supported output URIs:\n");
1258        printf("\tnone\n");
1259        printf("\n");
1260}
1261
1262static int dag_pstart_input(libtrace_t *libtrace) {
1263        char *scan, *tok;
1264        uint16_t stream_count = 0, max_streams;
1265        /* We keep our own pointer to per_thread as the system will free
1266         * up FORMAT_DATA without freeing this if something goes wrong */
1267        struct dag_per_thread_t *per_thread = NULL;
1268        int iserror = 0;
1269
1270        /* Check we aren't trying to create more threads than the DAG card can
1271         * handle */
1272        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
1273        if (libtrace->perpkt_thread_count > max_streams) {
1274                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "trying to create too "
1275                                          "many threads (max is %u)", max_streams);
1276                iserror = 1;
1277                goto cleanup;
1278        }
1279
1280        /* Create the thread structures */
1281        per_thread = calloc(libtrace->perpkt_thread_count,
1282                                                                         sizeof(struct dag_per_thread_t));
1283        FORMAT_DATA->per_thread = per_thread;
1284
1285        /* Get the stream names from the uri */
1286        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
1287                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri doesn't "
1288                                          "specify the DAG streams");
1289                iserror = 1;
1290                goto cleanup;
1291        }
1292
1293        scan++;
1294 
1295        tok = strtok(scan, ",");
1296        while (tok != NULL) {
1297                /* Ensure we haven't specified too many streams */
1298                if (stream_count >= libtrace->perpkt_thread_count) {
1299                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri specifies too "
1300                                          "many streams. Max is %u", max_streams);
1301                        iserror = 1;
1302                        goto cleanup;
1303                }
1304
1305                /* Save the stream details */
1306                per_thread[stream_count].device = FORMAT_DATA->device;
1307                per_thread[stream_count++].stream = (uint16_t)atoi(tok);
1308
1309                tok = strtok(NULL, ",");
1310        }
1311
1312 cleanup:
1313        if (iserror) {
1314                /* Free the per_thread memory */
1315                free(per_thread);
1316               
1317                return -1;
1318        } else {
1319                return 0;
1320        }
1321}
1322
1323
1324
1325/* TODO: Fold this into dag_available */
1326static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t) {
1327        uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1328
1329        /* If we've processed more than 4MB of data since we last called
1330         * dag_advance_stream, then we should call it again to allow the
1331         * space occupied by that 4MB to be released */
1332        if (diff >= dag_record_size && PERPKT_DATA(t)->processed < 4 * 1024 * 1024)
1333                return diff;
1334
1335        /* Update top and bottom pointers */
1336        PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
1337                                                                                         PERPKT_DATA(t)->stream,
1338                                                                                         &(PERPKT_DATA(t)->bottom));
1339
1340        if (PERPKT_DATA(t)->top == NULL) {
1341                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
1342                return -1;
1343        }
1344
1345        PERPKT_DATA(t)->processed = 0;
1346        diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1347        return diff;
1348}
1349
1350/* TODO: Fold this into dag_get_record */
1351static dag_record_t *dag_pget_record(libtrace_t *libtrace,
1352                                                                         libtrace_thread_t *t) {
1353        dag_record_t *erfptr = NULL;
1354        uint16_t size;
1355
1356        erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
1357        if (!erfptr)
1358                return NULL;
1359
1360        /* Ensure we have a whole record */
1361        size = ntohs(erfptr->rlen);
1362        assert(size >= dag_record_size);
1363        if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
1364                return NULL;
1365
1366        /* Advance the buffer pointers */
1367        PERPKT_DATA(t)->bottom += size;
1368        PERPKT_DATA(t)->processed += size;
1369       
1370        return erfptr;
1371}
1372
1373static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
1374                                                        libtrace_packet_t *packet) {
1375        dag_record_t *erfptr = NULL;
1376        int numbytes = 0;
1377        uint32_t flags = 0;
1378        struct timeval maxwait, pollwait;
1379
1380        pollwait.tv_sec = 0;
1381        pollwait.tv_usec = 10000;
1382        maxwait.tv_sec = 0;
1383        maxwait.tv_usec = 250000;
1384
1385        /* TODO: Support DUCK reporting */
1386
1387        /* Don't let anyone try to free our DAG memory hole! */
1388        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1389
1390        /* If the packet buffer is currently owned by libtrace, free it so
1391         * that we can set the packet to point into the DAG memory hole */
1392        if (packet->buf_control == TRACE_CTRL_PACKET) {
1393                free(packet->buffer);
1394                packet->buffer = 0;
1395        }
1396
1397        /* Configure DAG card stream polling */
1398        if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd, PERPKT_DATA(t)->stream,
1399                                                        sizeof(dag_record_t), &maxwait, &pollwait) < 0) {
1400                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1401                return -1;
1402        }
1403
1404        /* Grab an ERF record */
1405        do {
1406                numbytes = dag_pavailable(libtrace, t);
1407                if (numbytes < 0)
1408                        return numbytes;
1409                if (numbytes < dag_record_size) {
1410                        if (libtrace_halt)
1411                                return 0;
1412
1413                        /* Check message queue to see if we should abort early */
1414                        if (libtrace_message_queue_count(&t->messages) > 0)
1415                                return -2;
1416
1417                        /* Keep trying until we see a packet */
1418                        continue;
1419                }
1420
1421                erfptr = dag_pget_record(libtrace, t);
1422        } while (erfptr == NULL);
1423
1424        /* Prepare the libtrace packet */
1425        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1426                                                   flags))
1427                return -1;
1428
1429        PERPKT_DATA(t)->pkt_count++;
1430
1431        return packet->payload ? htons(erfptr->rlen) :
1432                erf_get_framing_length(packet);
1433}
1434
1435static int dag_ppause_input(libtrace_t *libtrace) {
1436        int i, tot = libtrace->perpkt_thread_count;
1437        struct dag_per_thread_t *t_data;
1438
1439        /* Stop and detach all the streams */
1440        printf("Stopping and detaching all streams\n");
1441        for (i = 0; i < tot; i++) {
1442                t_data = &FORMAT_DATA->per_thread[i];
1443
1444                if (dag_stop_stream(t_data->device->fd,
1445                                                        t_data->stream) < 0) {
1446                        trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
1447                                                  t_data->stream);
1448                        return -1;
1449                }
1450
1451                if (dag_detach_stream(t_data->device->fd,
1452                                                          t_data->stream) < 0) {
1453                        trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
1454                                                  t_data->stream);
1455                        return -1;
1456                }
1457        }
1458
1459        /* Free up the per_thread array */
1460        free(FORMAT_DATA->per_thread);
1461        FORMAT_DATA->per_thread = NULL;
1462
1463        return 0;
1464}
1465
1466static int dag_pconfig_input(libtrace_t *libtrace,
1467                                                         trace_parallel_option_t option, void *value) {
1468
1469        /* We don't support any of these! Normally you configure the DAG card
1470         * externally. */
1471        switch(option) {
1472        case TRACE_OPTION_SET_HASHER:
1473        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1474        case TRACE_OPTION_TRACETIME:
1475        case TRACE_OPTION_TICK_INTERVAL:
1476        case TRACE_OPTION_GET_CONFIG:
1477        case TRACE_OPTION_SET_CONFIG:
1478                return -1;
1479        }
1480        /* We don't provide a default option to ensure that future options will
1481         * generate a compiler warning. */
1482
1483        return -1;
1484}
1485
1486static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1487                                                                bool reader) {
1488        /* XXX: This function gets run sequentially for all
1489         * threads. Should investigate making it parallel as draining the
1490         * memory could be needlessly time consuming.
1491         */
1492        uint8_t *top, *bottom;
1493        uint8_t diff = 0; /* XXX: Investigate this type, as I would assume the value
1494                                           * could be larger than 255 */
1495        struct timeval zero, nopoll;
1496
1497        top = bottom = NULL;
1498
1499        /* Minimum delay is 10mS */
1500        zero.tv_sec = 0;
1501        zero.tv_usec = 10000;
1502        nopoll = zero;
1503
1504        if (reader) {
1505                if (t->type == THREAD_PERPKT) {
1506                        /* Pass the per thread data to the thread */
1507                        t->format_data = &FORMAT_DATA->per_thread[t->perpkt_num];
1508
1509                        /* Attach and start the DAG stream */
1510                        printf("t%u: starting and attaching stream #%u\n", t->perpkt_num,
1511                                   PERPKT_DATA(t)->stream);
1512                        if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
1513                                                                  PERPKT_DATA(t)->stream, 0, 0) < 0) {
1514                                trace_set_err(libtrace, errno, "can't attach DAG stream #%u",
1515                                                          PERPKT_DATA(t)->stream);
1516                                return -1;
1517                        }
1518                        if (dag_start_stream(PERPKT_DATA(t)->device->fd,
1519                                                                 PERPKT_DATA(t)->stream) < 0) {
1520                                trace_set_err(libtrace, errno, "can't start DAG stream #%u",
1521                                                          PERPKT_DATA(t)->stream);
1522                                return -1;
1523                        }
1524
1525                        /* Ensure that dag_advance_stream will return without blocking */
1526                        if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
1527                                                                   PERPKT_DATA(t)->stream, 0, &zero,
1528                                                                   &nopoll) < 0) {
1529                                trace_set_err(libtrace, errno, "dag_set_stream_poll failed!");
1530                                return -1;
1531                        }
1532
1533                        /* Clear all the data from the memory hole */
1534                        do {
1535                                top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
1536                                                                                 PERPKT_DATA(t)->stream,
1537                                                                                 &bottom);
1538
1539                                assert(top && bottom);
1540                                diff = top - bottom;
1541                                bottom -= diff;
1542                        } while (diff != 0);
1543
1544                        PERPKT_DATA(t)->top = NULL;
1545                        PERPKT_DATA(t)->bottom = NULL;
1546                        PERPKT_DATA(t)->pkt_count = 0;
1547                        PERPKT_DATA(t)->drops = 0;
1548                } else {
1549                        /* TODO: Figure out why we need this */
1550                        t->format_data = &FORMAT_DATA->per_thread[0];
1551                }
1552        }
1553
1554        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
1555
1556        return 0;
1557}
1558
1559static struct libtrace_format_t dag = {
1560        "dag",
1561        "$Id$",
1562        TRACE_FORMAT_ERF,
1563        dag_probe_filename,             /* probe filename */
1564        NULL,                           /* probe magic */
1565        dag_init_input,                 /* init_input */
1566        dag_config_input,               /* config_input */
1567        dag_start_input,                /* start_input */
1568        dag_pause_input,                /* pause_input */
1569        dag_init_output,                /* init_output */
1570        NULL,                           /* config_output */
1571        dag_start_output,               /* start_output */
1572        dag_fin_input,                  /* fin_input */
1573        dag_fin_output,                 /* fin_output */
1574        dag_read_packet,                /* read_packet */
1575        dag_prepare_packet,             /* prepare_packet */
1576        NULL,                           /* fin_packet */
1577        dag_write_packet,               /* write_packet */
1578        erf_get_link_type,              /* get_link_type */
1579        erf_get_direction,              /* get_direction */
1580        erf_set_direction,              /* set_direction */
1581        erf_get_erf_timestamp,          /* get_erf_timestamp */
1582        NULL,                           /* get_timeval */
1583        NULL,                           /* get_seconds */
1584        NULL,                           /* get_timespec */
1585        NULL,                           /* seek_erf */
1586        NULL,                           /* seek_timeval */
1587        NULL,                           /* seek_seconds */
1588        erf_get_capture_length,         /* get_capture_length */
1589        erf_get_wire_length,            /* get_wire_length */
1590        erf_get_framing_length,         /* get_framing_length */
1591        erf_set_capture_length,         /* set_capture_length */
1592        NULL,                           /* get_received_packets */
1593        NULL,                           /* get_filtered_packets */
1594        dag_get_dropped_packets,        /* get_dropped_packets */
1595        NULL,                           /* get_captured_packets */
1596        NULL,                           /* get_fd */
1597        trace_event_dag,                /* trace_event */
1598        dag_help,                       /* help */
1599        NULL,                            /* next pointer */
1600                {true, 0}, /* live packet capture, thread limit TBD */
1601                dag_pstart_input,
1602                dag_pread_packet,
1603                dag_ppause_input,
1604                NULL,
1605                dag_pconfig_input,
1606                dag_pregister_thread,
1607                NULL
1608};
1609
1610void dag_constructor(void) {
1611        register_format(&dag);
1612}
Note: See TracBrowser for help on using the repository browser.