source: lib/format_dag25.c @ fe11d12

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since fe11d12 was fe11d12, checked in by Dan Collins <djc44@…>, 7 years ago

Started work on creating a parallel interface for DAG.

There are still a number of things to do including:

  • Support for DUCK reporting
  • Couting the number of dropped packets
  • Reducing code duplication
  • Implementing pconfig_input, pfin_input and possibly fixing ppause_input
  • Property mode set to 100644
File size: 46.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84#define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
85
86#define FORMAT_DATA DATA(libtrace)
87#define FORMAT_DATA_OUT DATA_OUT(libtrace)
88
89#define DUCK FORMAT_DATA->duck
90static struct libtrace_format_t dag;
91
92/* A DAG device - a DAG device can support multiple streams (and therefore
93 * multiple input traces) so each trace needs to refer to a device */
94struct dag_dev_t {
95        char * dev_name;                /* Device name */
96        int fd;                         /* File descriptor */
97        uint16_t ref_count;             /* Number of input / output traces
98                                           that are using this device */
99        struct dag_dev_t *prev;         /* Pointer to the previous device in
100                                           the device list */
101        struct dag_dev_t *next;         /* Pointer to the next device in the
102                                           device list */
103};
104
105/* "Global" data that is stored for each DAG output trace */
106struct dag_format_data_out_t {
107        /* The DAG device being used for writing */
108        struct dag_dev_t *device;
109        /* The DAG stream that is being written on */
110        unsigned int dagstream;
111        /* Boolean flag indicating whether the stream is currently attached */
112        int stream_attached;
113        /* The amount of data waiting to be transmitted, in bytes */
114        uint64_t waiting;
115        /* A buffer to hold the data to be transmittted */
116        uint8_t *txbuffer;
117};
118
119/* Data that is stored for each libtrace_thread_t */
120struct dag_per_thread_t {
121        struct dag_dev_t *device; /* DAG device */
122        uint16_t stream; /* Stream number */
123        uint8_t *top; /* Pointer to the last unread byte in the DAG memory */
124        uint8_t *bottom; /* Pointer to the first unread byte in the DAG memory */
125        uint32_t processed; /* Amount of data processed from the bottom pointer */
126};
127
128/* "Global" data that is stored for each DAG input trace */
129struct dag_format_data_t {
130
131        /* Data required for regular DUCK reporting */
132        struct {
133                /* Timestamp of the last DUCK report */
134                uint32_t last_duck;
135                /* The number of seconds between each DUCK report */
136                uint32_t duck_freq;
137                /* Timestamp of the last packet read from the DAG card */
138                uint32_t last_pkt;
139                /* Dummy trace to ensure DUCK packets are dealt with using the
140                 * DUCK format functions */
141                libtrace_t *dummy_duck;
142        } duck;
143
144        /* The DAG device that we are reading from */
145        struct dag_dev_t *device;
146        /* The DAG stream that we are reading from */
147        unsigned int dagstream;
148        /* Boolean flag indicating whether the stream is currently attached */
149        int stream_attached;
150        /* Pointer to the first unread byte in the DAG memory hole */
151        uint8_t *bottom;
152        /* Pointer to the last unread byte in the DAG memory hole */
153        uint8_t *top;
154        /* The amount of data processed thus far from the bottom pointer */
155        uint32_t processed;
156        /* The number of packets that have been dropped */
157        uint64_t drops;
158        /* When running in parallel mode this is malloc'd with an array of thread
159         * structures */
160        struct dag_per_thread_t *per_thread;
161};
162
163/* To be thread-safe, we're going to need a mutex for operating on the list
164 * of DAG devices */
165pthread_mutex_t open_dag_mutex;
166
167/* The list of DAG devices that have been opened by libtrace.
168 *
169 * We can only open each DAG device once, but we might want to read from
170 * multiple streams. Therefore, we need to maintain a list of devices that we
171 * have opened (with ref counts!) so that we don't try to open a device too
172 * many times or close a device that we're still using */
173struct dag_dev_t *open_dags = NULL;
174
175/* Returns the amount of padding between the ERF header and the start of the
176 * captured packet data */
177static int dag_get_padding(const libtrace_packet_t *packet)
178{
179        /* ERF Ethernet records have a 2 byte padding before the packet itself
180         * so that the IP header is aligned on a 32 bit boundary.
181         */
182        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
183                dag_record_t *erfptr = (dag_record_t *)packet->header;
184                switch(erfptr->type) {
185                        case TYPE_ETH:
186                        case TYPE_DSM_COLOR_ETH:
187                                return 2;
188                        default:                return 0;
189                }
190        }
191        else {
192                switch(trace_get_link_type(packet)) {
193                        case TRACE_TYPE_ETH:    return 2;
194                        default:                return 0;
195                }
196        }
197}
198
199/* Attempts to determine if the given filename refers to a DAG device */
200static int dag_probe_filename(const char *filename)
201{
202        struct stat statbuf;
203        /* Can we stat the file? */
204        if (stat(filename, &statbuf) != 0) {
205                return 0;
206        }
207        /* Is it a character device? */
208        if (!S_ISCHR(statbuf.st_mode)) {
209                return 0;
210        }
211        /* Yeah, it's probably us. */
212        return 1;
213}
214
215/* Initialises the DAG output data structure */
216static void dag_init_format_out_data(libtrace_out_t *libtrace) {
217        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
218        // no DUCK on output
219        FORMAT_DATA_OUT->stream_attached = 0;
220        FORMAT_DATA_OUT->device = NULL;
221        FORMAT_DATA_OUT->dagstream = 0;
222        FORMAT_DATA_OUT->waiting = 0;
223
224}
225
226/* Initialises the DAG input data structure */
227static void dag_init_format_data(libtrace_t *libtrace) {
228        libtrace->format_data = (struct dag_format_data_t *)
229                malloc(sizeof(struct dag_format_data_t));
230        DUCK.last_duck = 0;
231        DUCK.duck_freq = 0;
232        DUCK.last_pkt = 0;
233        DUCK.dummy_duck = NULL;
234        FORMAT_DATA->stream_attached = 0;
235        FORMAT_DATA->drops = 0;
236        FORMAT_DATA->device = NULL;
237        FORMAT_DATA->dagstream = 0;
238        FORMAT_DATA->processed = 0;
239        FORMAT_DATA->bottom = NULL;
240        FORMAT_DATA->top = NULL;
241}
242
243/* Determines if there is already an entry for the given DAG device in the
244 * device list and increments the reference count for that device, if found.
245 *
246 * NOTE: This function assumes the open_dag_mutex is held by the caller */
247static struct dag_dev_t *dag_find_open_device(char *dev_name) {
248        struct dag_dev_t *dag_dev;
249
250        dag_dev = open_dags;
251
252        /* XXX: Not exactly zippy, but how often are we going to be dealing
253         * with multiple dag cards? */
254        while (dag_dev != NULL) {
255                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
256                        dag_dev->ref_count ++;
257                        return dag_dev;
258
259                }
260                dag_dev = dag_dev->next;
261        }
262        return NULL;
263
264
265}
266
267/* Closes a DAG device and removes it from the device list.
268 *
269 * Attempting to close a DAG device that has a non-zero reference count will
270 * cause an assertion failure!
271 *
272 * NOTE: This function assumes the open_dag_mutex is held by the caller */
273static void dag_close_device(struct dag_dev_t *dev) {
274        /* Need to remove from the device list */
275
276        assert(dev->ref_count == 0);
277
278        if (dev->prev == NULL) {
279                open_dags = dev->next;
280                if (dev->next)
281                        dev->next->prev = NULL;
282        } else {
283                dev->prev->next = dev->next;
284                if (dev->next)
285                        dev->next->prev = dev->prev;
286        }
287
288        dag_close(dev->fd);
289        if (dev->dev_name)
290        free(dev->dev_name);
291        free(dev);
292}
293
294
295/* Opens a new DAG device for writing and adds it to the DAG device list
296 *
297 * NOTE: this function should only be called when opening a DAG device for
298 * writing - there is little practical difference between this and the
299 * function below that covers the reading case, but we need the output trace
300 * object to report errors properly so the two functions take slightly
301 * different arguments. This is really lame and there should be a much better
302 * way of doing this.
303 *
304 * NOTE: This function assumes the open_dag_mutex is held by the caller
305 */
306static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
307        struct stat buf;
308        int fd;
309        struct dag_dev_t *new_dev;
310
311        /* Make sure the device exists */
312        if (stat(dev_name, &buf) == -1) {
313                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
314                return NULL;
315}
316
317        /* Make sure it is the appropriate type of device */
318        if (S_ISCHR(buf.st_mode)) {
319                /* Try opening the DAG device */
320                if((fd = dag_open(dev_name)) < 0) {
321                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
322                                        dev_name);
323                        return NULL;
324                }
325        } else {
326                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
327                                dev_name);
328                return NULL;
329        }
330
331        /* Add the device to our device list - it is just a doubly linked
332         * list with no inherent ordering; just tack the new one on the front
333         */
334        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
335        new_dev->fd = fd;
336        new_dev->dev_name = dev_name;
337        new_dev->ref_count = 1;
338
339        new_dev->prev = NULL;
340        new_dev->next = open_dags;
341        if (open_dags)
342                open_dags->prev = new_dev;
343
344        open_dags = new_dev;
345
346        return new_dev;
347}
348
349/* Opens a new DAG device for reading and adds it to the DAG device list
350 *
351 * NOTE: this function should only be called when opening a DAG device for
352 * reading - there is little practical difference between this and the
353 * function above that covers the writing case, but we need the input trace
354 * object to report errors properly so the two functions take slightly
355 * different arguments. This is really lame and there should be a much better
356 * way of doing this.
357 *
358 * NOTE: This function assumes the open_dag_mutex is held by the caller */
359static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
360        struct stat buf;
361        int fd;
362        struct dag_dev_t *new_dev;
363
364        /* Make sure the device exists */
365        if (stat(dev_name, &buf) == -1) {
366                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
367                return NULL;
368        }
369
370        /* Make sure it is the appropriate type of device */
371        if (S_ISCHR(buf.st_mode)) {
372                /* Try opening the DAG device */
373                if((fd = dag_open(dev_name)) < 0) {
374                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
375                                        dev_name);
376                        return NULL;
377                }
378        } else {
379                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
380                                dev_name);
381                return NULL;
382        }
383
384        /* Add the device to our device list - it is just a doubly linked
385         * list with no inherent ordering; just tack the new one on the front
386         */
387        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
388        new_dev->fd = fd;
389        new_dev->dev_name = dev_name;
390        new_dev->ref_count = 1;
391
392        new_dev->prev = NULL;
393        new_dev->next = open_dags;
394        if (open_dags)
395                open_dags->prev = new_dev;
396
397        open_dags = new_dev;
398
399        return new_dev;
400}
401
402/* Creates and initialises a DAG output trace */
403static int dag_init_output(libtrace_out_t *libtrace) {
404        char *dag_dev_name = NULL;
405        char *scan = NULL;
406        struct dag_dev_t *dag_device = NULL;
407        int stream = 1;
408       
409        /* XXX I don't know if this is important or not, but this function
410         * isn't present in all of the driver releases that this code is
411         * supposed to support! */
412        /*
413        unsigned long wake_time;
414        dagutil_sleep_get_wake_time(&wake_time,0);
415        */
416
417        dag_init_format_out_data(libtrace);
418        /* Grab the mutex while we're likely to be messing with the device
419         * list */
420        pthread_mutex_lock(&open_dag_mutex);
421       
422        /* Specific streams are signified using a comma in the libtrace URI,
423         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
424         *
425         * If no stream is specified, we will write using stream 1 */
426        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
427                dag_dev_name = strdup(libtrace->uridata);
428        } else {
429                dag_dev_name = (char *)strndup(libtrace->uridata,
430                                (size_t)(scan - libtrace->uridata));
431                stream = atoi(++scan);
432        }
433        FORMAT_DATA_OUT->dagstream = stream;
434
435        /* See if our DAG device is already open */
436        dag_device = dag_find_open_device(dag_dev_name);
437
438        if (dag_device == NULL) {
439                /* Device not yet opened - open it ourselves */
440                dag_device = dag_open_output_device(libtrace, dag_dev_name);
441        } else {
442                /* Otherwise, just use the existing one */
443                free(dag_dev_name);
444                dag_dev_name = NULL;
445        }
446
447        /* Make sure we have successfully opened a DAG device */
448        if (dag_device == NULL) {
449                if (dag_dev_name) {
450                        free(dag_dev_name);
451                }
452                pthread_mutex_unlock(&open_dag_mutex);
453                return -1;
454        }
455
456        FORMAT_DATA_OUT->device = dag_device;
457        pthread_mutex_unlock(&open_dag_mutex);
458        return 0;
459}
460
461/* Creates and initialises a DAG input trace */
462static int dag_init_input(libtrace_t *libtrace) {
463        char *dag_dev_name = NULL;
464        char *scan = NULL;
465        int stream = 0, thread_count = 1;
466        struct dag_dev_t *dag_device = NULL;
467
468        dag_init_format_data(libtrace);
469        /* Grab the mutex while we're likely to be messing with the device
470         * list */
471        pthread_mutex_lock(&open_dag_mutex);
472
473
474        /* DAG cards support multiple streams. In a single threaded capture,
475         * these are specified using a comma in the libtrace URI,
476         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
477         *
478         * If no stream is specified, we will read from stream 0 with one thread
479         */
480        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
481                dag_dev_name = strdup(libtrace->uridata);
482               
483        } else {
484                dag_dev_name = (char *)strndup(libtrace->uridata,
485                                                                           (size_t)(scan - libtrace->uridata));
486                stream = atoi(++scan);
487        }
488
489        FORMAT_DATA->dagstream = stream;
490
491        /* See if our DAG device is already open */
492        dag_device = dag_find_open_device(dag_dev_name);
493
494        if (dag_device == NULL) {
495                /* Device not yet opened - open it ourselves */
496                dag_device = dag_open_device(libtrace, dag_dev_name);
497        } else {
498                /* Otherwise, just use the existing one */
499                free(dag_dev_name);
500                dag_dev_name = NULL;
501        }
502
503        /* Make sure we have successfully opened a DAG device */
504        if (dag_device == NULL) {
505                if (dag_dev_name)
506                        free(dag_dev_name);
507                dag_dev_name = NULL;
508                pthread_mutex_unlock(&open_dag_mutex);
509                return -1;
510        }
511
512        FORMAT_DATA->device = dag_device;
513
514        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
515        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
516 
517        *  The symptom of the port being disabled is that libtrace will appear to hang.
518        */
519        /* Check kBooleanAttributeFault is false */
520        /* Check kBooleanAttributeLocalFault is false */
521        /* Check kBooleanAttributeLock is true ? */
522        /* Check kBooleanAttributePeerLink ? */
523
524        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
525        /* Set kUint32AttributeSnapLength to the snaplength */
526
527        pthread_mutex_unlock(&open_dag_mutex);
528        return 0;
529}
530
531/* Configures a DAG input trace */
532static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
533                                void *data) {
534        char conf_str[4096];
535        switch(option) {
536                case TRACE_OPTION_META_FREQ:
537                        /* This option is used to specify the frequency of DUCK
538                         * updates */
539                        DUCK.duck_freq = *(int *)data;
540                        return 0;
541                case TRACE_OPTION_SNAPLEN:
542                        /* Tell the card our new snap length */
543                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
544                        if (dag_configure(FORMAT_DATA->device->fd,
545                                                conf_str) != 0) {
546                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
547                                return -1;
548                        }
549                        return 0;
550                case TRACE_OPTION_PROMISC:
551                        /* DAG already operates in a promisc fashion */
552                        return -1;
553                case TRACE_OPTION_FILTER:
554                        /* We don't yet support pushing filters into DAG
555                         * cards */
556                        return -1;
557                case TRACE_OPTION_EVENT_REALTIME:
558                        /* Live capture is always going to be realtime */
559                        return -1;
560        }
561        return -1;
562}
563
564/* Starts a DAG output trace */
565static int dag_start_output(libtrace_out_t *libtrace) {
566        struct timeval zero, nopoll;
567
568        zero.tv_sec = 0;
569        zero.tv_usec = 0;
570        nopoll = zero;
571
572        /* Attach and start the DAG stream */
573
574        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
575                        FORMAT_DATA_OUT->dagstream, 0, 1048576) < 0) {
576                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
577                return -1;
578        }
579
580        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
581                        FORMAT_DATA_OUT->dagstream) < 0) {
582                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
583                return -1;
584        }
585        FORMAT_DATA_OUT->stream_attached = 1;
586
587        /* We don't want the dag card to do any sleeping */
588
589        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
590                        FORMAT_DATA_OUT->dagstream, 0, &zero,
591                        &nopoll);
592
593        return 0;
594}
595
596/* Starts a DAG input trace */
597static int dag_start_input(libtrace_t *libtrace) {
598        struct timeval zero, nopoll;
599        uint8_t *top, *bottom;
600        uint8_t diff = 0;
601        top = bottom = NULL;
602
603        zero.tv_sec = 0;
604        zero.tv_usec = 10000;
605        nopoll = zero;
606
607        /* Attach and start the DAG stream */
608        if (dag_attach_stream(FORMAT_DATA->device->fd,
609                                FORMAT_DATA->dagstream, 0, 0) < 0) {
610                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
611                return -1;
612        }
613
614        if (dag_start_stream(FORMAT_DATA->device->fd,
615                                FORMAT_DATA->dagstream) < 0) {
616                trace_set_err(libtrace, errno, "Cannot start DAG stream");
617                return -1;
618        }
619        FORMAT_DATA->stream_attached = 1;
620       
621        /* We don't want the dag card to do any sleeping */
622        dag_set_stream_poll(FORMAT_DATA->device->fd,
623                                FORMAT_DATA->dagstream, 0, &zero,
624                                &nopoll);
625
626        /* Should probably flush the memory hole now */
627        do {
628                top = dag_advance_stream(FORMAT_DATA->device->fd,
629                                        FORMAT_DATA->dagstream,
630                                        &bottom);
631                assert(top && bottom);
632                diff = top - bottom;
633                bottom -= diff;
634        } while (diff != 0);
635        FORMAT_DATA->top = NULL;
636        FORMAT_DATA->bottom = NULL;
637        FORMAT_DATA->processed = 0;
638        FORMAT_DATA->drops = 0;
639
640        return 0;
641}
642
643/* Pauses a DAG output trace */
644static int dag_pause_output(libtrace_out_t *libtrace) {
645
646        /* Stop and detach the stream */
647        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
648                        FORMAT_DATA_OUT->dagstream) < 0) {
649                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
650                return -1;
651        }
652        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
653                        FORMAT_DATA_OUT->dagstream) < 0) {
654                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
655                return -1;
656        }
657        FORMAT_DATA_OUT->stream_attached = 0;
658        return 0;
659}
660
661/* Pauses a DAG input trace */
662static int dag_pause_input(libtrace_t *libtrace) {
663
664        /* Stop and detach the stream */
665        if (dag_stop_stream(FORMAT_DATA->device->fd,
666                                FORMAT_DATA->dagstream) < 0) {
667                trace_set_err(libtrace, errno, "Could not stop DAG stream");
668                return -1;
669        }
670        if (dag_detach_stream(FORMAT_DATA->device->fd,
671                                FORMAT_DATA->dagstream) < 0) {
672                trace_set_err(libtrace, errno, "Could not detach DAG stream");
673                return -1;
674        }
675        FORMAT_DATA->stream_attached = 0;
676        return 0;
677}
678
679/* Closes a DAG input trace */
680static int dag_fin_input(libtrace_t *libtrace) {
681        /* Need the lock, since we're going to be handling the device list */
682        pthread_mutex_lock(&open_dag_mutex);
683       
684        /* Detach the stream if we are not paused */
685        if (FORMAT_DATA->stream_attached)
686                dag_pause_input(libtrace);
687        FORMAT_DATA->device->ref_count --;
688
689        /* Close the DAG device if there are no more references to it */
690        if (FORMAT_DATA->device->ref_count == 0)
691                dag_close_device(FORMAT_DATA->device);
692        if (DUCK.dummy_duck)
693                trace_destroy_dead(DUCK.dummy_duck);
694        free(libtrace->format_data);
695        pthread_mutex_unlock(&open_dag_mutex);
696        return 0; /* success */
697}
698
699/* Closes a DAG output trace */
700static int dag_fin_output(libtrace_out_t *libtrace) {
701       
702        /* Commit any outstanding traffic in the txbuffer */
703        if (FORMAT_DATA_OUT->waiting) {
704                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
705                                FORMAT_DATA_OUT->waiting );
706        }
707
708        /* Wait until the buffer is nearly clear before exiting the program,
709         * as we will lose packets otherwise */
710        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
711                        FORMAT_DATA_OUT->dagstream,
712                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
713                                        FORMAT_DATA_OUT->dagstream) - 8
714                        );
715
716        /* Need the lock, since we're going to be handling the device list */
717        pthread_mutex_lock(&open_dag_mutex);
718
719        /* Detach the stream if we are not paused */
720        if (FORMAT_DATA_OUT->stream_attached)
721                dag_pause_output(libtrace);
722        FORMAT_DATA_OUT->device->ref_count --;
723
724        /* Close the DAG device if there are no more references to it */
725        if (FORMAT_DATA_OUT->device->ref_count == 0)
726                dag_close_device(FORMAT_DATA_OUT->device);
727        free(libtrace->format_data);
728        pthread_mutex_unlock(&open_dag_mutex);
729        return 0; /* success */
730}
731
732/* Extracts DUCK information from the DAG card and produces a DUCK packet */
733static int dag_get_duckinfo(libtrace_t *libtrace,
734                                libtrace_packet_t *packet) {
735
736        /* Allocate memory for the DUCK data */
737        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
738                        !packet->buffer) {
739                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
740                packet->buf_control = TRACE_CTRL_PACKET;
741                if (!packet->buffer) {
742                        trace_set_err(libtrace, errno,
743                                        "Cannot allocate packet buffer");
744                        return -1;
745                }
746        }
747
748        /* DUCK doesn't have a format header */
749        packet->header = 0;
750        packet->payload = packet->buffer;
751
752        /* No need to check if we can get DUCK or not - we're modern
753         * enough so just grab the DUCK info */
754        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
755                                        (duckinf_t *)packet->payload) < 0)) {
756                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
757                return -1;
758        }
759
760        packet->type = TRACE_RT_DUCK_2_5;
761
762        /* Set the packet's tracce to point at a DUCK trace, so that the
763         * DUCK format functions will be called on the packet rather than the
764         * DAG ones */
765        if (!DUCK.dummy_duck)
766                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
767        packet->trace = DUCK.dummy_duck;
768        return sizeof(duckinf_t);
769}
770
771/* Determines the amount of data available to read from the DAG card */
772static int dag_available(libtrace_t *libtrace) {
773        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
774
775        /* If we've processed more than 4MB of data since we last called
776         * dag_advance_stream, then we should call it again to allow the
777         * space occupied by that 4MB to be released */
778        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
779                return diff;
780       
781        /* Update the top and bottom pointers */
782        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
783                        FORMAT_DATA->dagstream,
784                        &(FORMAT_DATA->bottom));
785       
786        if (FORMAT_DATA->top == NULL) {
787                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
788                return -1;
789        }
790        FORMAT_DATA->processed = 0;
791        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
792        return diff;
793}
794
795/* Returns a pointer to the start of the next complete ERF record */
796static dag_record_t *dag_get_record(libtrace_t *libtrace) {
797        dag_record_t *erfptr = NULL;
798        uint16_t size;
799        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
800        if (!erfptr)
801                return NULL;
802        size = ntohs(erfptr->rlen);
803        assert( size >= dag_record_size );
804        /* Make certain we have the full packet available */
805        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
806                return NULL;
807        FORMAT_DATA->bottom += size;
808        FORMAT_DATA->processed += size;
809        return erfptr;
810}
811
812/* Converts a buffer containing a recently read DAG packet record into a
813 * libtrace packet */
814static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
815                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
816
817        dag_record_t *erfptr;
818       
819        /* If the packet previously owned a buffer that is not the buffer
820         * that contains the new packet data, we're going to need to free the
821         * old one to avoid memory leaks */
822        if (packet->buffer != buffer &&
823                        packet->buf_control == TRACE_CTRL_PACKET) {
824                free(packet->buffer);
825        }
826
827        /* Set the buffer owner appropriately */
828        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
829                packet->buf_control = TRACE_CTRL_PACKET;
830        } else
831                packet->buf_control = TRACE_CTRL_EXTERNAL;
832
833        /* Update the packet pointers and type appropriately */
834        erfptr = (dag_record_t *)buffer;
835        packet->buffer = erfptr;
836        packet->header = erfptr;
837        packet->type = rt_type;
838
839        if (erfptr->flags.rxerror == 1) {
840                /* rxerror means the payload is corrupt - drop the payload
841                 * by tweaking rlen */
842                packet->payload = NULL;
843                erfptr->rlen = htons(erf_get_framing_length(packet));
844        } else {
845                packet->payload = (char*)packet->buffer
846                        + erf_get_framing_length(packet);
847        }
848
849        if (libtrace->format_data == NULL) {
850                dag_init_format_data(libtrace);
851        }
852
853        /* TODO: This isn't thread safe */
854#if 0
855        /* Update the dropped packets counter */
856
857        /* No loss counter for DSM coloured records - have to use
858         * some other API */
859        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
860                /* TODO */
861        } else {
862                /* Use the ERF loss counter */
863                DATA(libtrace)->drops += ntohs(erfptr->lctr);
864        }
865#endif
866
867        return 0;
868}
869
870/*
871 * dag_write_packet() at this stage attempts to improve tx performance
872 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
873 * has been met. I observed approximately 270% performance increase
874 * through this relatively naive tweak. No optimisation of buffer sizes
875 * was attempted.
876 */
877
878/* Pushes an ERF record onto the transmit stream */
879static int dag_dump_packet(libtrace_out_t *libtrace,
880                dag_record_t *erfptr, unsigned int pad, void *buffer) {
881        int size;
882
883        /*
884         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
885         * requested any space yet, and request some, storing the pointer at
886         * FORMAT_DATA_OUT->txbuffer.
887         *
888         * The amount to request is slightly magical at the moment - it's
889         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
890         * the buffer and handle overruns.
891         */
892        if (FORMAT_DATA_OUT->waiting == 0) {
893                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
894                                FORMAT_DATA_OUT->dagstream, 16908288);
895        }
896
897        /*
898         * Copy the header separately to the body, as we can't guarantee they
899         * are in contiguous memory
900         */
901        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
902        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
903
904
905
906        /*
907         * Copy our incoming packet into the outgoing buffer, and increment
908         * our waiting count
909         */
910        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
911        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
912        FORMAT_DATA_OUT->waiting += size;
913
914        /*
915         * If our output buffer has more than 16 Mebibytes in it, commit those
916         * bytes and reset the waiting count to 0.
917         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
918         * case there is still data in the buffer at program exit.
919         */
920
921        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
922                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
923                        FORMAT_DATA_OUT->waiting );
924                FORMAT_DATA_OUT->waiting = 0;
925        }
926
927        return size + pad + dag_record_size;
928
929}
930
931/* Attempts to determine a suitable ERF type for a given packet. Returns true
932 * if one is found, false otherwise */
933static bool find_compatible_linktype(libtrace_out_t *libtrace,
934                                libtrace_packet_t *packet, char *type)
935{
936         // Keep trying to simplify the packet until we can find
937         //something we can do with it
938
939        do {
940                *type=libtrace_to_erf_type(trace_get_link_type(packet));
941
942                // Success
943                if (*type != (char)-1)
944                        return true;
945
946                if (!demote_packet(packet)) {
947                        trace_set_err_out(libtrace,
948                                        TRACE_ERR_NO_CONVERSION,
949                                        "No erf type for packet (%i)",
950                                        trace_get_link_type(packet));
951                        return false;
952                }
953
954        } while(1);
955
956        return true;
957}
958
959/* Writes a packet to the provided DAG output trace */
960static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
961        /*
962         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
963         * sucks, sorry about that.
964         */
965        unsigned int pad = 0;
966        int numbytes;
967        void *payload = packet->payload;
968        dag_record_t *header = (dag_record_t *)packet->header;
969        char erf_type = 0;
970
971        if(!packet->header) {
972                /* No header, probably an RT packet. Lifted from
973                 * erf_write_packet(). */
974                return -1;
975        }
976
977        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
978                return 0;
979
980        pad = dag_get_padding(packet);
981
982        /*
983         * If the payload is null, adjust the rlen. Discussion of this is
984         * attached to erf_write_packet()
985         */
986        if (payload == NULL) {
987                header->rlen = htons(dag_record_size + pad);
988        }
989
990        if (packet->type == TRACE_RT_DATA_ERF) {
991                numbytes = dag_dump_packet(libtrace,
992                                header,
993                                pad,
994                                payload
995                                );
996
997        } else {
998                /* Build up a new packet header from the existing header */
999
1000                /* Simplify the packet first - if we can't do this, break
1001                 * early */
1002                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1003                        return -1;
1004
1005                dag_record_t erfhdr;
1006
1007                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1008                payload=packet->payload;
1009                pad = dag_get_padding(packet);
1010
1011                /* Flags. Can't do this */
1012                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1013                if (trace_get_direction(packet)!=(int)~0U)
1014                        erfhdr.flags.iface = trace_get_direction(packet);
1015
1016                erfhdr.type = erf_type;
1017
1018                /* Packet length (rlen includes format overhead) */
1019                assert(trace_get_capture_length(packet)>0
1020                                && trace_get_capture_length(packet)<=65536);
1021                assert(erf_get_framing_length(packet)>0
1022                                && trace_get_framing_length(packet)<=65536);
1023                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1024                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1025
1026                erfhdr.rlen = htons(trace_get_capture_length(packet)
1027                        + erf_get_framing_length(packet));
1028
1029
1030                /* Loss counter. Can't do this */
1031                erfhdr.lctr = 0;
1032                /* Wire length, does not include padding! */
1033                erfhdr.wlen = htons(trace_get_wire_length(packet));
1034
1035                /* Write it out */
1036                numbytes = dag_dump_packet(libtrace,
1037                                &erfhdr,
1038                                pad,
1039                                payload);
1040
1041        }
1042
1043        return numbytes;
1044}
1045
1046/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1047 *
1048 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1049 */
1050static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1051        int size = 0;
1052        struct timeval tv;
1053        dag_record_t *erfptr = NULL;
1054        int numbytes = 0;
1055        uint32_t flags = 0;
1056        struct timeval maxwait;
1057        struct timeval pollwait;
1058
1059        pollwait.tv_sec = 0;
1060        pollwait.tv_usec = 10000;
1061        maxwait.tv_sec = 0;
1062        maxwait.tv_usec = 250000;
1063
1064        /* Check if we're due for a DUCK report */
1065        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
1066                        DUCK.duck_freq != 0) {
1067                size = dag_get_duckinfo(libtrace, packet);
1068                DUCK.last_duck = DUCK.last_pkt;
1069                if (size != 0) {
1070                        return size;
1071                }
1072                /* No DUCK support, so don't waste our time anymore */
1073                DUCK.duck_freq = 0;
1074        }
1075
1076        /* Don't let anyone try to free our DAG memory hole! */
1077        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1078
1079        /* If the packet buffer is currently owned by libtrace, free it so
1080         * that we can set the packet to point into the DAG memory hole */
1081        if (packet->buf_control == TRACE_CTRL_PACKET) {
1082                free(packet->buffer);
1083                packet->buffer = 0;
1084        }
1085       
1086        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1087                        FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 
1088                        &pollwait) == -1)
1089        {
1090                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1091                return -1;
1092        }
1093
1094
1095        /* Grab a full ERF record */
1096        do {
1097                numbytes = dag_available(libtrace);
1098                if (numbytes < 0)
1099                        return numbytes;
1100                if (numbytes < dag_record_size) {
1101                        if (libtrace_halt)
1102                                return 0;
1103                        /* Block until we see a packet */
1104                        continue;
1105                }
1106                erfptr = dag_get_record(libtrace);
1107        } while (erfptr == NULL);
1108
1109        /* Prepare the libtrace packet */
1110        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1111                                flags))
1112                return -1;
1113
1114        /* Update the DUCK timer */
1115        tv = trace_get_timeval(packet);
1116        DUCK.last_pkt = tv.tv_sec;
1117
1118        return packet->payload ? htons(erfptr->rlen) :
1119                                erf_get_framing_length(packet);
1120}
1121
1122/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1123 * packet is available, we will return a packet event. Otherwise we will
1124 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1125 */
1126static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1127                                        libtrace_packet_t *packet) {
1128        libtrace_eventobj_t event = {0,0,0.0,0};
1129        dag_record_t *erfptr = NULL;
1130        int numbytes;
1131        uint32_t flags = 0;
1132        struct timeval minwait;
1133       
1134        minwait.tv_sec = 0;
1135        minwait.tv_usec = 10000;
1136       
1137        if (dag_set_stream_poll(FORMAT_DATA->device->fd, 
1138                        FORMAT_DATA->dagstream, 0, &minwait, 
1139                        &minwait) == -1)
1140        {
1141                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1142                event.type = TRACE_EVENT_TERMINATE;
1143                return event;
1144        }
1145
1146        do {
1147                erfptr = NULL;
1148                numbytes = 0;
1149       
1150                /* Need to call dag_available so that the top pointer will get
1151                 * updated, otherwise we'll never see any data! */
1152                numbytes = dag_available(libtrace);
1153
1154                /* May as well not bother calling dag_get_record if
1155                 * dag_available suggests that there's no data */
1156                if (numbytes != 0)
1157                        erfptr = dag_get_record(libtrace);
1158                if (erfptr == NULL) {
1159                        /* No packet available - sleep for a very short time */
1160                        if (libtrace_halt) {
1161                                event.type = TRACE_EVENT_TERMINATE;
1162                        } else {                       
1163                                event.type = TRACE_EVENT_SLEEP;
1164                                event.seconds = 0.0001;
1165                        }
1166                        break;
1167                }
1168                if (dag_prepare_packet(libtrace, packet, erfptr, 
1169                                        TRACE_RT_DATA_ERF, flags)) {
1170                        event.type = TRACE_EVENT_TERMINATE;
1171                        break;
1172                }
1173
1174
1175                event.size = trace_get_capture_length(packet) + 
1176                                trace_get_framing_length(packet);
1177               
1178                /* XXX trace_read_packet() normally applies the following
1179                 * config options for us, but this function is called via
1180                 * trace_event() so we have to do it ourselves */
1181
1182                if (libtrace->filter) {
1183                        int filtret = trace_apply_filter(libtrace->filter, 
1184                                        packet);
1185                        if (filtret == -1) {
1186                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1187                                                "Bad BPF Filter");
1188                                event.type = TRACE_EVENT_TERMINATE;
1189                                break;
1190                        }
1191
1192                        if (filtret == 0) {
1193                                /* This packet isn't useful so we want to
1194                                 * immediately see if there is another suitable
1195                                 * one - we definitely DO NOT want to return
1196                                 * a sleep event in this case, like we used to
1197                                 * do! */
1198                                libtrace->filtered_packets ++;
1199                                trace_clear_cache(packet);
1200                                continue;
1201                        }
1202                               
1203                        event.type = TRACE_EVENT_PACKET;
1204                } else {
1205                        event.type = TRACE_EVENT_PACKET;
1206                }
1207
1208                if (libtrace->snaplen > 0) {
1209                        trace_set_capture_length(packet, libtrace->snaplen);
1210                }
1211                libtrace->accepted_packets ++;
1212                break;
1213        } while (1);
1214
1215        return event;
1216}
1217
1218/* Gets the number of dropped packets */
1219static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1220        if (trace->format_data == NULL)
1221                return (uint64_t)-1;
1222        return DATA(trace)->drops;
1223}
1224
1225/* Prints some semi-useful help text about the DAG format module */
1226static void dag_help(void) {
1227        printf("dag format module: $Revision: 1755 $\n");
1228        printf("Supported input URIs:\n");
1229        printf("\tdag:/dev/dagn\n");
1230        printf("\n");
1231        printf("\te.g.: dag:/dev/dag0\n");
1232        printf("\n");
1233        printf("Supported output URIs:\n");
1234        printf("\tnone\n");
1235        printf("\n");
1236}
1237
1238static int dag_pstart_input(libtrace_t *libtrace) {
1239        char *scan, *tok;
1240        uint16_t stream_count = 0, max_streams;
1241        /* We keep our own pointer to per_thread as the system will free
1242         * up FORMAT_DATA without freeing this if something goes wrong */
1243        struct dag_per_thread_t *per_thread = NULL;
1244        int iserror = 0;
1245
1246        /* Check we aren't trying to create more threads than the DAG card can
1247         * handle */
1248        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
1249        if (libtrace->perpkt_thread_count > max_streams) {
1250                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "trying to create too "
1251                                          "many threads (max is %u)", max_streams);
1252                iserror = 1;
1253                goto cleanup;
1254        }
1255
1256        /* Create the thread structures */
1257        per_thread = calloc(libtrace->perpkt_thread_count,
1258                                                                         sizeof(struct dag_per_thread_t));
1259        FORMAT_DATA->per_thread = per_thread;
1260
1261        /* Get the stream names from the uri */
1262        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
1263                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri doesn't "
1264                                          "specify the DAG streams");
1265                iserror = 1;
1266                goto cleanup;
1267        }
1268
1269        scan++;
1270 
1271        tok = strtok(scan, ",");
1272        while (tok != NULL) {
1273                /* Ensure we haven't specified too many streams */
1274                if (stream_count >= libtrace->perpkt_thread_count) {
1275                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri specifies too "
1276                                          "many streams. Max is %u", max_streams);
1277                        iserror = 1;
1278                        goto cleanup;
1279                }
1280
1281                /* Save the stream details */
1282                per_thread[stream_count].device = FORMAT_DATA->device;
1283                per_thread[stream_count++].stream = (uint16_t)atoi(tok);
1284
1285                tok = strtok(NULL, ",");
1286        }
1287
1288 cleanup:
1289        if (iserror) {
1290                /* Free the per_thread memory */
1291                free(per_thread);
1292               
1293                return -1;
1294        } else {
1295                return 0;
1296        }
1297}
1298
1299
1300
1301/* TODO: Fold this into dag_available */
1302static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t) {
1303        uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1304
1305        /* If we've processed more than 4MB of data since we last called
1306         * dag_advance_stream, then we should call it again to allow the
1307         * space occupied by that 4MB to be released */
1308        if (diff >= dag_record_size && PERPKT_DATA(t)->processed < 4 * 1024 * 1024)
1309                return diff;
1310
1311        /* Update top and bottom pointers */
1312        PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
1313                                                                                         PERPKT_DATA(t)->stream,
1314                                                                                         &(PERPKT_DATA(t)->bottom));
1315
1316        if (PERPKT_DATA(t)->top == NULL) {
1317                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
1318                return -1;
1319        }
1320
1321        PERPKT_DATA(t)->processed = 0;
1322        diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1323        return diff;
1324}
1325
1326/* TODO: Fold this into dag_get_record */
1327static dag_record_t *dag_pget_record(libtrace_t *libtrace,
1328                                                                         libtrace_thread_t *t) {
1329        dag_record_t *erfptr = NULL;
1330        uint16_t size;
1331
1332        erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
1333        if (!erfptr)
1334                return NULL;
1335
1336        /* Ensure we have a whole record */
1337        size = ntohs(erfptr->rlen);
1338        assert(size >= dag_record_size);
1339        if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
1340                return NULL;
1341
1342        /* Advance the buffer pointers */
1343        PERPKT_DATA(t)->bottom += size;
1344        PERPKT_DATA(t)->processed += size;
1345       
1346        return erfptr;
1347}
1348
1349static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
1350                                                        libtrace_packet_t *packet) {
1351        dag_record_t *erfptr = NULL;
1352        int numbytes = 0;
1353        uint32_t flags = 0;
1354        struct timeval maxwait, pollwait;
1355
1356        pollwait.tv_sec = 0;
1357        pollwait.tv_usec = 10000;
1358        maxwait.tv_sec = 0;
1359        maxwait.tv_usec = 250000;
1360
1361        /* TODO: Support DUCK reporting */
1362
1363        /* Don't let anyone try to free our DAG memory hole! */
1364        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1365
1366        /* If the packet buffer is currently owned by libtrace, free it so
1367         * that we can set the packet to point into the DAG memory hole */
1368        if (packet->buf_control == TRACE_CTRL_PACKET) {
1369                free(packet->buffer);
1370                packet->buffer = 0;
1371        }
1372
1373        /* Configure DAG card stream polling */
1374        if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd, PERPKT_DATA(t)->stream,
1375                                                        sizeof(dag_record_t), &maxwait, &pollwait) < 0) {
1376                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1377                return -1;
1378        }
1379
1380        /* Grab an ERF record */
1381        do {
1382                numbytes = dag_pavailable(libtrace, t);
1383                if (numbytes < 0)
1384                        return numbytes;
1385                if (numbytes < dag_record_size) {
1386                        if (libtrace_halt)
1387                                return 0;
1388                        /* Keep trying until we see a packet */
1389                        continue;
1390                }
1391
1392                erfptr = dag_pget_record(libtrace, t);
1393        } while (erfptr == NULL);
1394
1395        /* Prepare the libtrace packet */
1396        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1397                                                   flags))
1398                return -1;
1399
1400        return packet->payload ? htons(erfptr->rlen) :
1401                erf_get_framing_length(packet);
1402}
1403
1404static int dag_ppause_input(libtrace_t *libtrace) {
1405        fprintf(stderr, "Assuming threads will pause when unregistered!\n");
1406        return 0;
1407}
1408
1409static int dag_pfin_input(libtrace_t *libtrace) {
1410        assert(0 && "dag_pfin_input not implemented");
1411
1412        return -1;
1413}
1414
1415static int dag_pconfig_input(libtrace_t *libtrace,
1416                                                         trace_parallel_option_t option, void *value) {
1417        assert(0 && "dag_pconfig_input not implemented");
1418
1419        return -1;
1420}
1421
1422static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1423                                                                bool reader) {
1424        /* XXX: This function gets run sequentially for all
1425         * threads. Should investigate making it parallel as draining the
1426         * memory could be needlessly time consuming
1427         */
1428        uint8_t *top, *bottom;
1429        uint8_t diff = 0; /* XXX: Investigate this type, as I would assume the value
1430                                           * could be larger than 255 */
1431
1432        top = bottom = NULL;
1433
1434        fprintf(stderr, "t%u: registering thread\n", t->perpkt_num);
1435        if (reader) {
1436                if (t->type == THREAD_PERPKT) {
1437                        /* Pass the per thread data to the thread */
1438                        t->format_data = &FORMAT_DATA->per_thread[t->perpkt_num];
1439
1440                        /* Attach and start the DAG stream */
1441                        printf("t%u: starting and attaching stream #%u\n", t->perpkt_num,
1442                                   PERPKT_DATA(t)->stream);
1443                        if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
1444                                                                  PERPKT_DATA(t)->stream, 0, 0) < 0) {
1445                                trace_set_err(libtrace, errno, "can't attach DAG stream #%u",
1446                                                          PERPKT_DATA(t)->stream);
1447                                return -1;
1448                        }
1449                        if (dag_start_stream(PERPKT_DATA(t)->device->fd,
1450                                                                 PERPKT_DATA(t)->stream) < 0) {
1451                                trace_set_err(libtrace, errno, "can't start DAG stream #%u",
1452                                                          PERPKT_DATA(t)->stream);
1453                                return -1;
1454                        }
1455
1456                        /* Clear all the data from the memory hole */
1457                        do {
1458                                top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
1459                                                                                 PERPKT_DATA(t)->stream,
1460                                                                                 &bottom);
1461                                assert(top && bottom);
1462                                diff = top - bottom;
1463                                bottom -= diff;
1464                        } while (diff != 0);
1465
1466                        /* Clear the pointers */
1467                        PERPKT_DATA(t)->top = NULL;
1468                        PERPKT_DATA(t)->bottom = NULL;
1469                } else {
1470                        /* TODO: Figure out why we need this */
1471                        t->format_data = &FORMAT_DATA->per_thread[0];
1472                }
1473        }
1474
1475        return 0;
1476}
1477
1478static void dag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) {
1479        if (t->type == THREAD_PERPKT) {
1480                fprintf(stderr, "t%u: stopping and detaching stream %u\n",
1481                                t->perpkt_num, PERPKT_DATA(t)->stream);
1482
1483                /* Stop and detach the DAG stream */
1484                if (dag_stop_stream(PERPKT_DATA(t)->device->fd,
1485                                                        PERPKT_DATA(t)->stream) < 0) {
1486                        trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
1487                                                  PERPKT_DATA(t)->stream);
1488                        return -1;
1489                }
1490
1491                if (dag_detach_stream(PERPKT_DATA(t)->device->fd,
1492                                                          PERPKT_DATA(t)->stream) < 0) {
1493                        trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
1494                                                  PERPKT_DATA(t)->stream);
1495                        return -1;
1496                }
1497        }
1498
1499        return 0;
1500}
1501
1502static struct libtrace_format_t dag = {
1503        "dag",
1504        "$Id$",
1505        TRACE_FORMAT_ERF,
1506        dag_probe_filename,             /* probe filename */
1507        NULL,                           /* probe magic */
1508        dag_init_input,                 /* init_input */
1509        dag_config_input,               /* config_input */
1510        dag_start_input,                /* start_input */
1511        dag_pause_input,                /* pause_input */
1512        dag_init_output,                /* init_output */
1513        NULL,                           /* config_output */
1514        dag_start_output,               /* start_output */
1515        dag_fin_input,                  /* fin_input */
1516        dag_fin_output,                 /* fin_output */
1517        dag_read_packet,                /* read_packet */
1518        dag_prepare_packet,             /* prepare_packet */
1519        NULL,                           /* fin_packet */
1520        dag_write_packet,               /* write_packet */
1521        erf_get_link_type,              /* get_link_type */
1522        erf_get_direction,              /* get_direction */
1523        erf_set_direction,              /* set_direction */
1524        erf_get_erf_timestamp,          /* get_erf_timestamp */
1525        NULL,                           /* get_timeval */
1526        NULL,                           /* get_seconds */
1527        NULL,                           /* get_timespec */
1528        NULL,                           /* seek_erf */
1529        NULL,                           /* seek_timeval */
1530        NULL,                           /* seek_seconds */
1531        erf_get_capture_length,         /* get_capture_length */
1532        erf_get_wire_length,            /* get_wire_length */
1533        erf_get_framing_length,         /* get_framing_length */
1534        erf_set_capture_length,         /* set_capture_length */
1535        NULL,                           /* get_received_packets */
1536        NULL,                           /* get_filtered_packets */
1537        dag_get_dropped_packets,        /* get_dropped_packets */
1538        NULL,                           /* get_captured_packets */
1539        NULL,                           /* get_fd */
1540        trace_event_dag,                /* trace_event */
1541        dag_help,                       /* help */
1542        NULL,                            /* next pointer */
1543                {true, 0}, /* live packet capture, thread limit TBD */
1544                dag_pstart_input,
1545                dag_pread_packet,
1546                dag_ppause_input,
1547                dag_pfin_input,
1548                dag_pconfig_input,
1549                dag_pregister_thread,
1550                dag_punregister_thread
1551};
1552
1553void dag_constructor(void) {
1554        register_format(&dag);
1555}
Note: See TracBrowser for help on using the repository browser.