source: lib/format_dag25.c @ 5df059b

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5df059b was 5df059b, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Update DAG25 to the new statisitic counters
Improve error handling of an incorrect parallel stream configuration.

  • Property mode set to 100644
File size: 44.6 KB
RevLine 
[5e85c23]1/*
2 * This file is part of libtrace
3 *
[43c00e5]4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
[5e85c23]6 *
[43c00e5]7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
[5e85c23]11 * All rights reserved.
12 *
[43c00e5]13 * This code has been developed by the University of Waikato WAND
[5e85c23]14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
[293999b]30 * $Id$
[5e85c23]31 *
32 */
[43c00e5]33
[5e85c23]34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
[44028d4]49#include <sys/stat.h>
[5e85c23]50
51#include <sys/mman.h>
[2faa57e]52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
[5e85c23]55
[121b7e2]56
[5e85c23]57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
[43c00e5]70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
[5e85c23]81#define DATA(x) ((struct dag_format_data_t *)x->format_data)
[f0b87a7]82#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
[cb39d35]83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
[35c5a72]84
85#define FORMAT_DATA DATA(libtrace)
[f0b87a7]86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
[5e85c23]88#define DUCK FORMAT_DATA->duck
[cb39d35]89
90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
92
[5e85c23]93static struct libtrace_format_t dag;
94
[43c00e5]95/* A DAG device - a DAG device can support multiple streams (and therefore
96 * multiple input traces) so each trace needs to refer to a device */
[121b7e2]97struct dag_dev_t {
[43c00e5]98        char * dev_name;                /* Device name */
99        int fd;                         /* File descriptor */
100        uint16_t ref_count;             /* Number of input / output traces
101                                           that are using this device */
102        struct dag_dev_t *prev;         /* Pointer to the previous device in
103                                           the device list */
104        struct dag_dev_t *next;         /* Pointer to the next device in the
105                                           device list */
[f0b87a7]106};
107
[43c00e5]108/* "Global" data that is stored for each DAG output trace */
[f0b87a7]109struct dag_format_data_out_t {
[43c00e5]110        /* The DAG device being used for writing */
[f0b87a7]111        struct dag_dev_t *device;
[43c00e5]112        /* The DAG stream that is being written on */
[f0b87a7]113        unsigned int dagstream;
[43c00e5]114        /* Boolean flag indicating whether the stream is currently attached */
[f0b87a7]115        int stream_attached;
[43c00e5]116        /* The amount of data waiting to be transmitted, in bytes */
[5865c72]117        uint64_t waiting;
[43c00e5]118        /* A buffer to hold the data to be transmittted */
[5865c72]119        uint8_t *txbuffer;
[f0b87a7]120};
[121b7e2]121
[cb39d35]122/* Data that is stored against each input stream */
123struct dag_per_stream_t {
124        /* DAG stream number */
125        uint16_t dagstream;
[d97778c]126        /* Pointer to the last unread byte in the DAG memory */
127        uint8_t *top;
128        /* Pointer to the first unread byte in the DAG memory */
129        uint8_t *bottom;
130        /* Amount of data processed from the bottom pointer */
131        uint32_t processed;
[cb39d35]132        /* Number of packets seen by the stream */
[d97778c]133        uint64_t pkt_count;
[cb39d35]134        /* Drop count for this particular stream */
[c66a465]135        uint64_t drops;
[cb39d35]136        /* Boolean values to indicate if a particular interface has been seen
137         * or not. This is limited to four interfaces, which is enough to
138         * support all current DAG cards */
139        uint8_t seeninterface[4];
[fe11d12]140};
141
[43c00e5]142/* "Global" data that is stored for each DAG input trace */
[5e85c23]143struct dag_format_data_t {
[43c00e5]144        /* Data required for regular DUCK reporting */
[cb39d35]145        /* TODO: This doesn't work with the 10X2S card! I don't know how
146         * DUCK stuff works and don't know how to fix it */
[5e85c23]147        struct {
[43c00e5]148                /* Timestamp of the last DUCK report */
[5e85c23]149                uint32_t last_duck;
[43c00e5]150                /* The number of seconds between each DUCK report */
[d97778c]151                uint32_t duck_freq;
[43c00e5]152                /* Timestamp of the last packet read from the DAG card */
[d97778c]153                uint32_t last_pkt;
[43c00e5]154                /* Dummy trace to ensure DUCK packets are dealt with using the
155                 * DUCK format functions */
[d97778c]156                libtrace_t *dummy_duck;
157        } duck;
[92020b8]158        /* DAG device */
159        struct dag_dev_t *device;
[cb39d35]160        /* Boolean flag indicating whether the trace is currently attached */
[52656ed]161        int stream_attached;
[cb39d35]162        /* Data stored against each DAG input stream */
163        libtrace_list_t *per_stream;
[5e85c23]164};
165
[43c00e5]166/* To be thread-safe, we're going to need a mutex for operating on the list
167 * of DAG devices */
[121b7e2]168pthread_mutex_t open_dag_mutex;
[43c00e5]169
170/* The list of DAG devices that have been opened by libtrace.
171 *
172 * We can only open each DAG device once, but we might want to read from
173 * multiple streams. Therefore, we need to maintain a list of devices that we
174 * have opened (with ref counts!) so that we don't try to open a device too
175 * many times or close a device that we're still using */
[121b7e2]176struct dag_dev_t *open_dags = NULL;
[5e85c23]177
[43c00e5]178/* Returns the amount of padding between the ERF header and the start of the
179 * captured packet data */
[5865c72]180static int dag_get_padding(const libtrace_packet_t *packet)
181{
[43c00e5]182        /* ERF Ethernet records have a 2 byte padding before the packet itself
183         * so that the IP header is aligned on a 32 bit boundary.
184         */
[5865c72]185        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
186                dag_record_t *erfptr = (dag_record_t *)packet->header;
187                switch(erfptr->type) {
188                        case TYPE_ETH:
189                        case TYPE_DSM_COLOR_ETH:
190                                return 2;
191                        default:                return 0;
192                }
193        }
194        else {
195                switch(trace_get_link_type(packet)) {
196                        case TRACE_TYPE_ETH:    return 2;
197                        default:                return 0;
198                }
199        }
200}
201
[43c00e5]202/* Attempts to determine if the given filename refers to a DAG device */
[35c5a72]203static int dag_probe_filename(const char *filename)
[91b72d3]204{
205        struct stat statbuf;
206        /* Can we stat the file? */
207        if (stat(filename, &statbuf) != 0) {
208                return 0;
209        }
210        /* Is it a character device? */
211        if (!S_ISCHR(statbuf.st_mode)) {
212                return 0;
213        }
214        /* Yeah, it's probably us. */
215        return 1;
216}
217
[43c00e5]218/* Initialises the DAG output data structure */
[d97778c]219static void dag_init_format_out_data(libtrace_out_t *libtrace)
220{
221        libtrace->format_data = (struct dag_format_data_out_t *)
222                malloc(sizeof(struct dag_format_data_out_t));
[f0b87a7]223        // no DUCK on output
224        FORMAT_DATA_OUT->stream_attached = 0;
225        FORMAT_DATA_OUT->device = NULL;
226        FORMAT_DATA_OUT->dagstream = 0;
[5865c72]227        FORMAT_DATA_OUT->waiting = 0;
[f0b87a7]228
229}
230
[43c00e5]231/* Initialises the DAG input data structure */
[d97778c]232static void dag_init_format_data(libtrace_t *libtrace)
233{
[cb39d35]234        struct dag_per_stream_t stream_data;
235
[f0fb38f]236        libtrace->format_data = (struct dag_format_data_t *)
[f0b87a7]237                malloc(sizeof(struct dag_format_data_t));
[f0fb38f]238        DUCK.last_duck = 0;
[d97778c]239        DUCK.duck_freq = 0;
240        DUCK.last_pkt = 0;
241        DUCK.dummy_duck = NULL;
[cb39d35]242
243        FORMAT_DATA->per_stream =
244                libtrace_list_init(sizeof(stream_data));
245        assert(FORMAT_DATA->per_stream != NULL);
246
247        /* We'll start with just one instance of stream_data, and we'll
248         * add more later if we need them */
249        memset(&stream_data, 0, sizeof(stream_data));
250        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
[f0fb38f]251}
252
[43c00e5]253/* Determines if there is already an entry for the given DAG device in the
254 * device list and increments the reference count for that device, if found.
255 *
256 * NOTE: This function assumes the open_dag_mutex is held by the caller */
[d97778c]257static struct dag_dev_t *dag_find_open_device(char *dev_name)
258{
[121b7e2]259        struct dag_dev_t *dag_dev;
[f0b87a7]260
[121b7e2]261        dag_dev = open_dags;
262
263        /* XXX: Not exactly zippy, but how often are we going to be dealing
264         * with multiple dag cards? */
265        while (dag_dev != NULL) {
266                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
[a9a91d1]267                        dag_dev->ref_count ++;
[121b7e2]268                        return dag_dev;
269                }
270                dag_dev = dag_dev->next;
271        }
272        return NULL;
273}
[5e85c23]274
[43c00e5]275/* Closes a DAG device and removes it from the device list.
276 *
277 * Attempting to close a DAG device that has a non-zero reference count will
278 * cause an assertion failure!
279 *
280 * NOTE: This function assumes the open_dag_mutex is held by the caller */
[d97778c]281static void dag_close_device(struct dag_dev_t *dev)
282{
[121b7e2]283        /* Need to remove from the device list */
[a9a91d1]284        assert(dev->ref_count == 0);
[f0b87a7]285
[a9a91d1]286        if (dev->prev == NULL) {
287                open_dags = dev->next;
288                if (dev->next)
289                        dev->next->prev = NULL;
290        } else {
291                dev->prev->next = dev->next;
292                if (dev->next)
293                        dev->next->prev = dev->prev;
[121b7e2]294        }
295
[a9a91d1]296        dag_close(dev->fd);
[f0b87a7]297        if (dev->dev_name)
[70bf39a]298                free(dev->dev_name);
[a9a91d1]299        free(dev);
[f0b87a7]300}
301
[43c00e5]302
303/* Opens a new DAG device for writing and adds it to the DAG device list
304 *
305 * NOTE: this function should only be called when opening a DAG device for
[d97778c]306 * writing - there is little practical difference between this and the
[43c00e5]307 * function below that covers the reading case, but we need the output trace
[d97778c]308 * object to report errors properly so the two functions take slightly
[43c00e5]309 * different arguments. This is really lame and there should be a much better
310 * way of doing this.
311 *
[d97778c]312 * NOTE: This function assumes the open_dag_mutex is held by the caller
[43c00e5]313 */
[d97778c]314static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
315                                                char *dev_name)
316{
[f0b87a7]317        struct stat buf;
318        int fd;
319        struct dag_dev_t *new_dev;
320
[43c00e5]321        /* Make sure the device exists */
[f0b87a7]322        if (stat(dev_name, &buf) == -1) {
323                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
324                return NULL;
[d97778c]325        }
[f0b87a7]326
[43c00e5]327        /* Make sure it is the appropriate type of device */
[f0b87a7]328        if (S_ISCHR(buf.st_mode)) {
[43c00e5]329                /* Try opening the DAG device */
[f0b87a7]330                if((fd = dag_open(dev_name)) < 0) {
331                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
332                                        dev_name);
333                        return NULL;
334                }
335        } else {
336                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
337                                dev_name);
338                return NULL;
339        }
340
[43c00e5]341        /* Add the device to our device list - it is just a doubly linked
342         * list with no inherent ordering; just tack the new one on the front
343         */
[f0b87a7]344        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
345        new_dev->fd = fd;
346        new_dev->dev_name = dev_name;
347        new_dev->ref_count = 1;
348
349        new_dev->prev = NULL;
350        new_dev->next = open_dags;
351        if (open_dags)
352                open_dags->prev = new_dev;
353
354        open_dags = new_dev;
355
356        return new_dev;
[121b7e2]357}
358
[43c00e5]359/* Opens a new DAG device for reading and adds it to the DAG device list
360 *
361 * NOTE: this function should only be called when opening a DAG device for
[d97778c]362 * reading - there is little practical difference between this and the
[43c00e5]363 * function above that covers the writing case, but we need the input trace
[d97778c]364 * object to report errors properly so the two functions take slightly
[43c00e5]365 * different arguments. This is really lame and there should be a much better
366 * way of doing this.
367 *
368 * NOTE: This function assumes the open_dag_mutex is held by the caller */
[121b7e2]369static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
370        struct stat buf;
371        int fd;
372        struct dag_dev_t *new_dev;
[f0b87a7]373
[43c00e5]374        /* Make sure the device exists */
[d97778c]375        if (stat(dev_name, &buf) == -1) {
376                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
377                return NULL;
378        }
[f0b87a7]379
[43c00e5]380        /* Make sure it is the appropriate type of device */
[5e85c23]381        if (S_ISCHR(buf.st_mode)) {
[43c00e5]382                /* Try opening the DAG device */
[121b7e2]383                if((fd = dag_open(dev_name)) < 0) {
[d97778c]384                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
385                                      dev_name);
386                        return NULL;
387                }
[5e85c23]388        } else {
389                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
[d97778c]390                              dev_name);
391                return NULL;
392        }
[f0b87a7]393
[43c00e5]394        /* Add the device to our device list - it is just a doubly linked
395         * list with no inherent ordering; just tack the new one on the front
396         */
[121b7e2]397        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
398        new_dev->fd = fd;
399        new_dev->dev_name = dev_name;
[a9a91d1]400        new_dev->ref_count = 1;
[f0b87a7]401
[a9a91d1]402        new_dev->prev = NULL;
[121b7e2]403        new_dev->next = open_dags;
[a9a91d1]404        if (open_dags)
405                open_dags->prev = new_dev;
[f0b87a7]406
[121b7e2]407        open_dags = new_dev;
[f0b87a7]408
[121b7e2]409        return new_dev;
410}
[f0b87a7]411
[43c00e5]412/* Creates and initialises a DAG output trace */
[d97778c]413static int dag_init_output(libtrace_out_t *libtrace)
414{
[16840ea]415        /* Upon successful creation, the device name is stored against the
416         * device and free when it is free()d */
417        char *dag_dev_name = NULL;
[f0b87a7]418        char *scan = NULL;
419        struct dag_dev_t *dag_device = NULL;
420        int stream = 1;
[d97778c]421
[cce868c]422        /* XXX I don't know if this is important or not, but this function
423         * isn't present in all of the driver releases that this code is
424         * supposed to support! */
425        /*
[5865c72]426        unsigned long wake_time;
427        dagutil_sleep_get_wake_time(&wake_time,0);
[cce868c]428        */
[f0b87a7]429
430        dag_init_format_out_data(libtrace);
[d97778c]431        /* Grab the mutex while we're likely to be messing with the device
[43c00e5]432         * list */
[f0b87a7]433        pthread_mutex_lock(&open_dag_mutex);
[d97778c]434
[43c00e5]435        /* Specific streams are signified using a comma in the libtrace URI,
436         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
437         *
438         * If no stream is specified, we will write using stream 1 */
[f0b87a7]439        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
[16840ea]440                dag_dev_name = strdup(libtrace->uridata);
[f0b87a7]441        } else {
[16840ea]442                dag_dev_name = (char *)strndup(libtrace->uridata,
[f0b87a7]443                                (size_t)(scan - libtrace->uridata));
444                stream = atoi(++scan);
445        }
[35c5a72]446        FORMAT_DATA_OUT->dagstream = stream;
[f0b87a7]447
[43c00e5]448        /* See if our DAG device is already open */
[16840ea]449        dag_device = dag_find_open_device(dag_dev_name);
[f0b87a7]450
451        if (dag_device == NULL) {
452                /* Device not yet opened - open it ourselves */
[16840ea]453                dag_device = dag_open_output_device(libtrace, dag_dev_name);
454        } else {
455                /* Otherwise, just use the existing one */
456                free(dag_dev_name);
457                dag_dev_name = NULL;
[f0b87a7]458        }
459
[43c00e5]460        /* Make sure we have successfully opened a DAG device */
[f0b87a7]461        if (dag_device == NULL) {
[16840ea]462                if (dag_dev_name) {
463                        free(dag_dev_name);
[35c5a72]464                }
[43c00e5]465                pthread_mutex_unlock(&open_dag_mutex);
[f0b87a7]466                return -1;
467        }
468
[35c5a72]469        FORMAT_DATA_OUT->device = dag_device;
[f0b87a7]470        pthread_mutex_unlock(&open_dag_mutex);
471        return 0;
472}
[121b7e2]473
[43c00e5]474/* Creates and initialises a DAG input trace */
[121b7e2]475static int dag_init_input(libtrace_t *libtrace) {
[16840ea]476        /* Upon successful creation, the device name is stored against the
477         * device and free when it is free()d */
478        char *dag_dev_name = NULL;
[121b7e2]479        char *scan = NULL;
[cb39d35]480        int stream = 0;
[121b7e2]481        struct dag_dev_t *dag_device = NULL;
[f0b87a7]482
[f0fb38f]483        dag_init_format_data(libtrace);
[d97778c]484        /* Grab the mutex while we're likely to be messing with the device
[43c00e5]485         * list */
[a9a91d1]486        pthread_mutex_lock(&open_dag_mutex);
[fe11d12]487
488
489        /* DAG cards support multiple streams. In a single threaded capture,
490         * these are specified using a comma in the libtrace URI,
[43c00e5]491         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
492         *
[d97778c]493         * If no stream is specified, we will read from stream 0 with
494         * one thread
[fe11d12]495         */
[121b7e2]496        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
[16840ea]497                dag_dev_name = strdup(libtrace->uridata);
[121b7e2]498        } else {
[16840ea]499                dag_dev_name = (char *)strndup(libtrace->uridata,
[121b7e2]500                                (size_t)(scan - libtrace->uridata));
501                stream = atoi(++scan);
502        }
503
[cb39d35]504        FORMAT_DATA_FIRST->dagstream = stream;
[121b7e2]505
[43c00e5]506        /* See if our DAG device is already open */
[16840ea]507        dag_device = dag_find_open_device(dag_dev_name);
[121b7e2]508
509        if (dag_device == NULL) {
510                /* Device not yet opened - open it ourselves */
[16840ea]511                dag_device = dag_open_device(libtrace, dag_dev_name);
512        } else {
513                /* Otherwise, just use the existing one */
514                free(dag_dev_name);
515                dag_dev_name = NULL;
[121b7e2]516        }
517
[43c00e5]518        /* Make sure we have successfully opened a DAG device */
[121b7e2]519        if (dag_device == NULL) {
[16840ea]520                if (dag_dev_name)
521                        free(dag_dev_name);
522                dag_dev_name = NULL;
[43c00e5]523                pthread_mutex_unlock(&open_dag_mutex);
[121b7e2]524                return -1;
525        }
526
[92020b8]527        FORMAT_DATA->device = dag_device;
[f0b87a7]528
[d97778c]529        /* See Config_Status_API_Programming_Guide.pdf from the Endace
530           Dag Documentation */
531        /* Check kBooleanAttributeActive is true -- no point capturing
532         * on an interface that's disabled
533         *
534         * The symptom of the port being disabled is that libtrace
535         * will appear to hang. */
[81c2da8]536        /* Check kBooleanAttributeFault is false */
537        /* Check kBooleanAttributeLocalFault is false */
538        /* Check kBooleanAttributeLock is true ? */
539        /* Check kBooleanAttributePeerLink ? */
540
[d97778c]541        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
542           on libtrace promisc attribute?*/
[81c2da8]543        /* Set kUint32AttributeSnapLength to the snaplength */
544
[a9a91d1]545        pthread_mutex_unlock(&open_dag_mutex);
[d97778c]546        return 0;
[5e85c23]547}
[f0b87a7]548
[43c00e5]549/* Configures a DAG input trace */
[5e85c23]550static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
[d97778c]551                            void *data)
552{
553        char conf_str[4096];
[52656ed]554        switch(option) {
[d97778c]555        case TRACE_OPTION_META_FREQ:
556                /* This option is used to specify the frequency of DUCK
557                 * updates */
558                DUCK.duck_freq = *(int *)data;
559                return 0;
560        case TRACE_OPTION_SNAPLEN:
561                /* Tell the card our new snap length */
562                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
[92020b8]563                if (dag_configure(FORMAT_DATA->device->fd,
[d97778c]564                                  conf_str) != 0) {
565                        trace_set_err(libtrace, errno, "Failed to configure "
566                                      "snaplen on DAG card: %s",
567                                      libtrace->uridata);
[708f9ae]568                        return -1;
[d97778c]569                }
570                return 0;
571        case TRACE_OPTION_PROMISC:
572                /* DAG already operates in a promisc fashion */
573                return -1;
574        case TRACE_OPTION_FILTER:
575                /* We don't yet support pushing filters into DAG
576                 * cards */
577                return -1;
578        case TRACE_OPTION_EVENT_REALTIME:
579                /* Live capture is always going to be realtime */
580                return -1;
581        }
[708f9ae]582        return -1;
[5e85c23]583}
[43c00e5]584
585/* Starts a DAG output trace */
[d97778c]586static int dag_start_output(libtrace_out_t *libtrace)
587{
[f0b87a7]588        struct timeval zero, nopoll;
589
590        zero.tv_sec = 0;
591        zero.tv_usec = 0;
592        nopoll = zero;
593
[43c00e5]594        /* Attach and start the DAG stream */
[35c5a72]595        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
[70bf39a]596                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
[f0b87a7]597                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
598                return -1;
599        }
600
[35c5a72]601        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
602                        FORMAT_DATA_OUT->dagstream) < 0) {
[f0b87a7]603                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
604                return -1;
605        }
[35c5a72]606        FORMAT_DATA_OUT->stream_attached = 1;
[f0b87a7]607
608        /* We don't want the dag card to do any sleeping */
[5865c72]609        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
610                        FORMAT_DATA_OUT->dagstream, 0, &zero,
[f0b87a7]611                        &nopoll);
[5865c72]612
[f0b87a7]613        return 0;
614}
[5e85c23]615
[92020b8]616static int dag_start_input_stream(libtrace_t *libtrace,
617                                  struct dag_per_stream_t * stream) {
[d97778c]618        struct timeval zero, nopoll;
619        uint8_t *top, *bottom, *starttop;
[5e85c23]620        top = bottom = NULL;
621
622        zero.tv_sec = 0;
[d97778c]623        zero.tv_usec = 10000;
624        nopoll = zero;
[5e85c23]625
[43c00e5]626        /* Attach and start the DAG stream */
[92020b8]627        if (dag_attach_stream(FORMAT_DATA->device->fd,
628                              stream->dagstream, 0, 0) < 0) {
629                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
630                              stream->dagstream);
[d97778c]631                return -1;
632        }
[5e85c23]633
[92020b8]634        if (dag_start_stream(FORMAT_DATA->device->fd,
635                             stream->dagstream) < 0) {
636                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
637                              stream->dagstream);
[d97778c]638                return -1;
639        }
[52656ed]640        FORMAT_DATA->stream_attached = 1;
[d97778c]641
[5e85c23]642        /* We don't want the dag card to do any sleeping */
[92020b8]643        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
644                            stream->dagstream, 0, &zero,
645                            &nopoll) < 0) {
646                trace_set_err(libtrace, errno,
647                              "dag_set_stream_poll failed!");
648                return -1;
649        }
[f0b87a7]650
[92020b8]651        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
652                                      stream->dagstream,
[d97778c]653                                      &bottom);
[70bf39a]654
[5e85c23]655        /* Should probably flush the memory hole now */
[68d3308]656        top = starttop;
657        while (starttop - bottom > 0) {
[70bf39a]658                bottom += (starttop - bottom);
[92020b8]659                top = dag_advance_stream(FORMAT_DATA->device->fd,
660                                         stream->dagstream,
[d97778c]661                                         &bottom);
[70bf39a]662        }
[92020b8]663        stream->top = top;
664        stream->bottom = bottom;
665        stream->processed = 0;
666        stream->drops = 0;
[f0b87a7]667
668        return 0;
[92020b8]669
670}
671
672/* Starts a DAG input trace */
673static int dag_start_input(libtrace_t *libtrace)
674{
675        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
[f0b87a7]676}
677
[cb39d35]678static int dag_pstart_input(libtrace_t *libtrace)
679{
680        char *scan, *tok;
681        uint16_t stream_count = 0, max_streams;
682        int iserror = 0;
683        struct dag_per_stream_t stream_data;
684
685        /* Check we aren't trying to create more threads than the DAG card can
686         * handle */
[92020b8]687        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
[cb39d35]688        if (libtrace->perpkt_thread_count > max_streams) {
689                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
690                              "trying to create too many threads (max is %u)",
691                              max_streams);
692                iserror = 1;
693                goto cleanup;
694        }
695
696        /* Get the stream names from the uri */
697        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
698                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
699                              "format uri doesn't specify the DAG streams");
700                iserror = 1;
701                goto cleanup;
702        }
703
704        scan++;
705
706        tok = strtok(scan, ",");
707        while (tok != NULL) {
708                /* Ensure we haven't specified too many streams */
709                if (stream_count >= libtrace->perpkt_thread_count) {
710                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
711                                      "format uri specifies too many streams. "
712                                      "Max is %u", max_streams);
713                        iserror = 1;
714                        goto cleanup;
715                }
716
717                /* Save the stream details */
718                if (stream_count == 0) {
719                        /* Special case where we update the existing stream
720                         * data structure */
721                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
722                } else {
723                        memset(&stream_data, 0, sizeof(stream_data));
724                        stream_data.dagstream = (uint16_t)atoi(tok);
725                        libtrace_list_push_back(FORMAT_DATA->per_stream,
726                                                &stream_data);
727                }
728
729                stream_count++;
730                tok = strtok(NULL, ",");
731        }
732
733        FORMAT_DATA->stream_attached = 1;
734
735 cleanup:
736        if (iserror) {
737                return -1;
738        } else {
739                return 0;
740        }
741}
742
[43c00e5]743/* Pauses a DAG output trace */
[d97778c]744static int dag_pause_output(libtrace_out_t *libtrace)
745{
[43c00e5]746        /* Stop and detach the stream */
[35c5a72]747        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
[d97778c]748                            FORMAT_DATA_OUT->dagstream) < 0) {
[f0b87a7]749                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
750                return -1;
751        }
[35c5a72]752        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
[d97778c]753                              FORMAT_DATA_OUT->dagstream) < 0) {
754                trace_set_err_out(libtrace, errno,
755                                  "Could not detach DAG stream");
[f0b87a7]756                return -1;
757        }
[35c5a72]758        FORMAT_DATA_OUT->stream_attached = 0;
[5e85c23]759        return 0;
760}
761
[43c00e5]762/* Pauses a DAG input trace */
[d97778c]763static int dag_pause_input(libtrace_t *libtrace)
764{
[cb39d35]765        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
766
767        /* Stop and detach each stream */
768        while (tmp != NULL) {
[92020b8]769                if (dag_stop_stream(FORMAT_DATA->device->fd,
[cb39d35]770                                    STREAM_DATA(tmp)->dagstream) < 0) {
771                        trace_set_err(libtrace, errno,
772                                      "Could not stop DAG stream");
773                        printf("Count not stop DAG stream\n");
774                        return -1;
775                }
[92020b8]776                if (dag_detach_stream(FORMAT_DATA->device->fd,
[cb39d35]777                                      STREAM_DATA(tmp)->dagstream) < 0) {
778                        trace_set_err(libtrace, errno,
779                                      "Could not detach DAG stream");
780                        printf("Count not detach DAG stream\n");
781                        return -1;
782                }
783
784                tmp = tmp->next;
[5e85c23]785        }
[cb39d35]786
[52656ed]787        FORMAT_DATA->stream_attached = 0;
[5e85c23]788        return 0;
789}
790
[cb39d35]791
792
[43c00e5]793/* Closes a DAG input trace */
[d97778c]794static int dag_fin_input(libtrace_t *libtrace)
795{
[43c00e5]796        /* Need the lock, since we're going to be handling the device list */
[a9a91d1]797        pthread_mutex_lock(&open_dag_mutex);
[d97778c]798
[43c00e5]799        /* Detach the stream if we are not paused */
[52656ed]800        if (FORMAT_DATA->stream_attached)
801                dag_pause_input(libtrace);
[92020b8]802        FORMAT_DATA->device->ref_count--;
[f0b87a7]803
[92020b8]804        /* Close the DAG device if there are no more references to it */
805        if (FORMAT_DATA->device->ref_count == 0)
806                dag_close_device(FORMAT_DATA->device);
[cb39d35]807
[5e85c23]808        if (DUCK.dummy_duck)
809                trace_destroy_dead(DUCK.dummy_duck);
[cb39d35]810
811        /* Clear the list */
812        libtrace_list_deinit(FORMAT_DATA->per_stream);
[5e85c23]813        free(libtrace->format_data);
[a9a91d1]814        pthread_mutex_unlock(&open_dag_mutex);
[d97778c]815        return 0; /* success */
[5e85c23]816}
817
[43c00e5]818/* Closes a DAG output trace */
[d97778c]819static int dag_fin_output(libtrace_out_t *libtrace)
820{
821
[43c00e5]822        /* Commit any outstanding traffic in the txbuffer */
[5865c72]823        if (FORMAT_DATA_OUT->waiting) {
[d97778c]824                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
825                                           FORMAT_DATA_OUT->dagstream,
826                                           FORMAT_DATA_OUT->waiting );
[5865c72]827        }
828
[d97778c]829        /* Wait until the buffer is nearly clear before exiting the program,
[43c00e5]830         * as we will lose packets otherwise */
[d97778c]831        dag_tx_get_stream_space
832                (FORMAT_DATA_OUT->device->fd,
833                 FORMAT_DATA_OUT->dagstream,
834                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
835                                            FORMAT_DATA_OUT->dagstream) - 8);
[5865c72]836
[43c00e5]837        /* Need the lock, since we're going to be handling the device list */
[f0b87a7]838        pthread_mutex_lock(&open_dag_mutex);
[43c00e5]839
840        /* Detach the stream if we are not paused */
[35c5a72]841        if (FORMAT_DATA_OUT->stream_attached)
[f0b87a7]842                dag_pause_output(libtrace);
[35c5a72]843        FORMAT_DATA_OUT->device->ref_count --;
[f0b87a7]844
[43c00e5]845        /* Close the DAG device if there are no more references to it */
[35c5a72]846        if (FORMAT_DATA_OUT->device->ref_count == 0)
847                dag_close_device(FORMAT_DATA_OUT->device);
[f0b87a7]848        free(libtrace->format_data);
849        pthread_mutex_unlock(&open_dag_mutex);
850        return 0; /* success */
851}
852
[c5ac872]853#ifdef DAGIOC_CARD_DUCK
854#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
855#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
856#else
857#ifdef DAGIOCDUCK
858#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
859#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
860#else
861#warning "DAG appears to be missing DUCK support"
862#endif
863#endif
864
[43c00e5]865/* Extracts DUCK information from the DAG card and produces a DUCK packet */
[5e85c23]866static int dag_get_duckinfo(libtrace_t *libtrace,
867                                libtrace_packet_t *packet) {
[43c00e5]868
[c5ac872]869        if (DUCK.duck_freq == 0)
870                return 0;
871
872#ifndef LIBTRACE_DUCK_IOCTL
873        trace_set_err(libtrace, errno, 
874                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
875        DUCK.duck_freq = 0;
876        return -1;
877#endif
878
879        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
880                return 0;
881
[43c00e5]882        /* Allocate memory for the DUCK data */
[d97778c]883        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
884            !packet->buffer) {
885                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
886                packet->buf_control = TRACE_CTRL_PACKET;
887                if (!packet->buffer) {
888                        trace_set_err(libtrace, errno,
889                                      "Cannot allocate packet buffer");
890                        return -1;
891                }
892        }
[5e85c23]893
[43c00e5]894        /* DUCK doesn't have a format header */
[de74f88]895        packet->header = 0;
896        packet->payload = packet->buffer;
897
898        /* No need to check if we can get DUCK or not - we're modern
899         * enough so just grab the DUCK info */
[92020b8]900        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
[de74f88]901                   (duckinf_t *)packet->payload) < 0)) {
902                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
[c5ac872]903                DUCK.duck_freq = 0;
[de74f88]904                return -1;
905        }
[5e85c23]906
[de74f88]907        packet->type = LIBTRACE_DUCK_VERSION;
[43c00e5]908
[c5ac872]909        /* Set the packet's trace to point at a DUCK trace, so that the
[43c00e5]910         * DUCK format functions will be called on the packet rather than the
911         * DAG ones */
[de74f88]912        if (!DUCK.dummy_duck)
913                DUCK.dummy_duck = trace_create_dead("duck:dummy");
914        packet->trace = DUCK.dummy_duck;
915        DUCK.last_duck = DUCK.last_pkt;
916        return sizeof(duckinf_t);
[5e85c23]917}
918
[43c00e5]919/* Determines the amount of data available to read from the DAG card */
[cb39d35]920static int dag_available(libtrace_t *libtrace,
921                         struct dag_per_stream_t *stream_data)
[d97778c]922{
[cb39d35]923        uint32_t diff = stream_data->top - stream_data->bottom;
[43c00e5]924
[5798dc6]925        /* If we've processed more than 4MB of data since we last called
[f0b87a7]926         * dag_advance_stream, then we should call it again to allow the
[5798dc6]927         * space occupied by that 4MB to be released */
[cb39d35]928        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
[52656ed]929                return diff;
[d97778c]930
[43c00e5]931        /* Update the top and bottom pointers */
[92020b8]932        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
[cb39d35]933                                              stream_data->dagstream,
934                                              &(stream_data->bottom));
[d97778c]935
[cb39d35]936        if (stream_data->top == NULL) {
[52656ed]937                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
938                return -1;
939        }
[cb39d35]940        stream_data->processed = 0;
941        diff = stream_data->top - stream_data->bottom;
[52656ed]942        return diff;
943}
944
[43c00e5]945/* Returns a pointer to the start of the next complete ERF record */
[cb39d35]946static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
[d97778c]947{
948        dag_record_t *erfptr = NULL;
949        uint16_t size;
950
[cb39d35]951        erfptr = (dag_record_t *)stream_data->bottom;
[5e85c23]952        if (!erfptr)
[d97778c]953                return NULL;
954
955        size = ntohs(erfptr->rlen);
956        assert( size >= dag_record_size );
957
[52656ed]958        /* Make certain we have the full packet available */
[cb39d35]959        if (size > (stream_data->top - stream_data->bottom))
[52656ed]960                return NULL;
[d97778c]961
[cb39d35]962        stream_data->bottom += size;
963        stream_data->processed += size;
[5e85c23]964        return erfptr;
965}
966
[43c00e5]967/* Converts a buffer containing a recently read DAG packet record into a
968 * libtrace packet */
[cb39d35]969static int dag_prepare_packet_real(libtrace_t *libtrace,
970                                   struct dag_per_stream_t *stream_data,
971                                   libtrace_packet_t *packet,
972                                   void *buffer, libtrace_rt_types_t rt_type,
973                                   uint32_t flags)
[d97778c]974{
[f0fb38f]975        dag_record_t *erfptr;
[d97778c]976
[43c00e5]977        /* If the packet previously owned a buffer that is not the buffer
[d97778c]978         * that contains the new packet data, we're going to need to free the
979         * old one to avoid memory leaks */
[f0b87a7]980        if (packet->buffer != buffer &&
[cb39d35]981            packet->buf_control == TRACE_CTRL_PACKET) {
[f0fb38f]982                free(packet->buffer);
983        }
[f0b87a7]984
[43c00e5]985        /* Set the buffer owner appropriately */
[f0fb38f]986        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
987                packet->buf_control = TRACE_CTRL_PACKET;
988        } else
989                packet->buf_control = TRACE_CTRL_EXTERNAL;
[f0b87a7]990
[43c00e5]991        /* Update the packet pointers and type appropriately */
[f52bcdd]992        erfptr = (dag_record_t *)buffer;
[f0fb38f]993        packet->buffer = erfptr;
[d97778c]994        packet->header = erfptr;
995        packet->type = rt_type;
[f0b87a7]996
[f0fb38f]997        if (erfptr->flags.rxerror == 1) {
[d97778c]998                /* rxerror means the payload is corrupt - drop the payload
999                 * by tweaking rlen */
1000                packet->payload = NULL;
1001                erfptr->rlen = htons(erf_get_framing_length(packet));
1002        } else {
1003                packet->payload = (char*)packet->buffer
1004                        + erf_get_framing_length(packet);
1005        }
[f0b87a7]1006
[f0fb38f]1007        if (libtrace->format_data == NULL) {
[f0b87a7]1008                dag_init_format_data(libtrace);
[f0fb38f]1009        }
[f0b87a7]1010
[43c00e5]1011        /* Update the dropped packets counter */
[d97778c]1012        /* No loss counter for DSM coloured records - have to use some
1013         * other API */
[f0fb38f]1014        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
[43c00e5]1015                /* TODO */
[f0fb38f]1016        } else {
[43c00e5]1017                /* Use the ERF loss counter */
[cb39d35]1018                if (stream_data->seeninterface[erfptr->flags.iface]
1019                    == 0) {
1020                        stream_data->seeninterface[erfptr->flags.iface]
1021                                = 1;
[c66a465]1022                } else {
[cb39d35]1023                        stream_data->drops += ntohs(erfptr->lctr);
[c66a465]1024                }
[f0fb38f]1025        }
[c66a465]1026
[cb39d35]1027        packet->error = 1;
1028
[f0fb38f]1029        return 0;
[5e85c23]1030}
1031
[cb39d35]1032static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1033                              void *buffer, libtrace_rt_types_t rt_type,
1034                              uint32_t flags)
1035{
1036        return dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
1037                                       buffer, rt_type, flags);
1038}
1039
[5865c72]1040/*
1041 * dag_write_packet() at this stage attempts to improve tx performance
1042 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1043 * has been met. I observed approximately 270% performance increase
1044 * through this relatively naive tweak. No optimisation of buffer sizes
1045 * was attempted.
1046 */
1047
[43c00e5]1048/* Pushes an ERF record onto the transmit stream */
[5865c72]1049static int dag_dump_packet(libtrace_out_t *libtrace,
[d97778c]1050                           dag_record_t *erfptr, unsigned int pad,
1051                           void *buffer)
1052{
[5865c72]1053        int size;
1054
1055        /*
[d97778c]1056         * If we've got 0 bytes waiting in the txqueue, assume that we
1057         * haven't requested any space yet, and request some, storing
1058         * the pointer at FORMAT_DATA_OUT->txbuffer.
[5865c72]1059         *
1060         * The amount to request is slightly magical at the moment - it's
1061         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1062         * the buffer and handle overruns.
1063         */
1064        if (FORMAT_DATA_OUT->waiting == 0) {
[d97778c]1065                FORMAT_DATA_OUT->txbuffer =
1066                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1067                                                FORMAT_DATA_OUT->dagstream,
1068                                                16908288);
[5865c72]1069        }
1070
1071        /*
[43c00e5]1072         * Copy the header separately to the body, as we can't guarantee they
1073         * are in contiguous memory
[5865c72]1074         */
[d97778c]1075        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1076               (dag_record_size + pad));
[5865c72]1077        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1078
1079        /*
[43c00e5]1080         * Copy our incoming packet into the outgoing buffer, and increment
1081         * our waiting count
[5865c72]1082         */
1083        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
[d97778c]1084        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1085               size);
[5865c72]1086        FORMAT_DATA_OUT->waiting += size;
1087
1088        /*
[43c00e5]1089         * If our output buffer has more than 16 Mebibytes in it, commit those
1090         * bytes and reset the waiting count to 0.
1091         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1092         * case there is still data in the buffer at program exit.
[5865c72]1093         */
1094        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
[d97778c]1095                FORMAT_DATA_OUT->txbuffer =
1096                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1097                                                   FORMAT_DATA_OUT->dagstream,
1098                                                   FORMAT_DATA_OUT->waiting);
[5865c72]1099                FORMAT_DATA_OUT->waiting = 0;
1100        }
1101
1102        return size + pad + dag_record_size;
1103}
1104
[43c00e5]1105/* Attempts to determine a suitable ERF type for a given packet. Returns true
1106 * if one is found, false otherwise */
[5865c72]1107static bool find_compatible_linktype(libtrace_out_t *libtrace,
[d97778c]1108                                     libtrace_packet_t *packet, char *type)
[5865c72]1109{
[d97778c]1110        /* Keep trying to simplify the packet until we can find
1111         * something we can do with it */
[5865c72]1112
1113        do {
[d97778c]1114                *type = libtrace_to_erf_type(trace_get_link_type(packet));
[5865c72]1115
[d97778c]1116                /* Success */
[43c00e5]1117                if (*type != (char)-1)
[5865c72]1118                        return true;
1119
1120                if (!demote_packet(packet)) {
1121                        trace_set_err_out(libtrace,
[d97778c]1122                                          TRACE_ERR_NO_CONVERSION,
1123                                          "No erf type for packet (%i)",
1124                                          trace_get_link_type(packet));
[5865c72]1125                        return false;
1126                }
1127
1128        } while(1);
1129
1130        return true;
1131}
[5e85c23]1132
[43c00e5]1133/* Writes a packet to the provided DAG output trace */
[d97778c]1134static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1135{
1136        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1137         * coding sucks, sorry about that.
[5865c72]1138         */
1139        unsigned int pad = 0;
1140        int numbytes;
1141        void *payload = packet->payload;
1142        dag_record_t *header = (dag_record_t *)packet->header;
[43c00e5]1143        char erf_type = 0;
[5865c72]1144
1145        if(!packet->header) {
[d97778c]1146                /* No header, probably an RT packet. Lifted from
[43c00e5]1147                 * erf_write_packet(). */
[5865c72]1148                return -1;
1149        }
[f0b87a7]1150
[5f329ab]1151        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1152                return 0;
1153
[5865c72]1154        pad = dag_get_padding(packet);
[f0b87a7]1155
[5865c72]1156        /*
[43c00e5]1157         * If the payload is null, adjust the rlen. Discussion of this is
[5865c72]1158         * attached to erf_write_packet()
1159         */
1160        if (payload == NULL) {
1161                header->rlen = htons(dag_record_size + pad);
1162        }
1163
1164        if (packet->type == TRACE_RT_DATA_ERF) {
[d97778c]1165                numbytes = dag_dump_packet(libtrace, header, pad, payload);
[5865c72]1166        } else {
1167                /* Build up a new packet header from the existing header */
1168
[d97778c]1169                /* Simplify the packet first - if we can't do this, break
[43c00e5]1170                 * early */
1171                if (!find_compatible_linktype(libtrace,packet,&erf_type))
[5865c72]1172                        return -1;
1173
1174                dag_record_t erfhdr;
1175
1176                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1177                payload=packet->payload;
1178                pad = dag_get_padding(packet);
1179
1180                /* Flags. Can't do this */
1181                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
[51d1f64]1182                if (trace_get_direction(packet)!=(int)~0U)
[5865c72]1183                        erfhdr.flags.iface = trace_get_direction(packet);
[35c5a72]1184
[43c00e5]1185                erfhdr.type = erf_type;
[35c5a72]1186
[5865c72]1187                /* Packet length (rlen includes format overhead) */
[d97778c]1188                assert(trace_get_capture_length(packet) > 0
1189                       && trace_get_capture_length(packet) <= 65536);
1190                assert(erf_get_framing_length(packet) > 0
1191                       && trace_get_framing_length(packet) <= 65536);
1192                assert(trace_get_capture_length(packet) +
1193                       erf_get_framing_length(packet) > 0
1194                       && trace_get_capture_length(packet) +
1195                       erf_get_framing_length(packet) <= 65536);
[35c5a72]1196
[5865c72]1197                erfhdr.rlen = htons(trace_get_capture_length(packet)
[d97778c]1198                                    + erf_get_framing_length(packet));
[35c5a72]1199
1200
[43c00e5]1201                /* Loss counter. Can't do this */
[5865c72]1202                erfhdr.lctr = 0;
1203                /* Wire length, does not include padding! */
1204                erfhdr.wlen = htons(trace_get_wire_length(packet));
1205
1206                /* Write it out */
[d97778c]1207                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
[35c5a72]1208        }
[f0b87a7]1209
[5865c72]1210        return numbytes;
[f0b87a7]1211}
1212
[43c00e5]1213/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1214 *
1215 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1216 */
[5df059b]1217static int dag_read_packet_stream(libtrace_t *libtrace,
[cb39d35]1218                                struct dag_per_stream_t *stream_data,
1219                                libtrace_thread_t *t, /* Optional */
1220                                libtrace_packet_t *packet)
[d97778c]1221{
[de74f88]1222        int size = 0;
[d97778c]1223        dag_record_t *erfptr = NULL;
[52656ed]1224        int numbytes = 0;
[f0fb38f]1225        uint32_t flags = 0;
[cb39d35]1226        struct timeval maxwait, pollwait;
[51d1f64]1227
1228        pollwait.tv_sec = 0;
1229        pollwait.tv_usec = 10000;
1230        maxwait.tv_sec = 0;
1231        maxwait.tv_usec = 250000;
[f0b87a7]1232
[de74f88]1233        /* Check if we're due for a DUCK report */
[c5ac872]1234        size = dag_get_duckinfo(libtrace, packet);
1235
1236        if (size != 0)
1237                return size;
[5e85c23]1238
[43c00e5]1239        /* Don't let anyone try to free our DAG memory hole! */
[c0dba7a]1240        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
[f0b87a7]1241
[43c00e5]1242        /* If the packet buffer is currently owned by libtrace, free it so
1243         * that we can set the packet to point into the DAG memory hole */
[d97778c]1244        if (packet->buf_control == TRACE_CTRL_PACKET) {
1245                free(packet->buffer);
1246                packet->buffer = 0;
1247        }
1248
[92020b8]1249        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
[d97778c]1250                                sizeof(dag_record_t), &maxwait,
1251                                &pollwait) == -1) {
[51d1f64]1252                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1253                return -1;
1254        }
1255
[43c00e5]1256        /* Grab a full ERF record */
[5e85c23]1257        do {
[cb39d35]1258                numbytes = dag_available(libtrace, stream_data);
[52656ed]1259                if (numbytes < 0)
1260                        return numbytes;
[51d1f64]1261                if (numbytes < dag_record_size) {
[cb39d35]1262                        /* Check the message queue if we have one to check */
1263                        if (t != NULL &&
1264                            libtrace_message_queue_count(&t->messages) > 0)
1265                                return -2;
1266
[51d1f64]1267                        if (libtrace_halt)
1268                                return 0;
[52656ed]1269                        /* Block until we see a packet */
1270                        continue;
[51d1f64]1271                }
[cb39d35]1272                erfptr = dag_get_record(stream_data);
[5e85c23]1273        } while (erfptr == NULL);
1274
[92020b8]1275        packet->trace = libtrace;
[43c00e5]1276        /* Prepare the libtrace packet */
[cb39d35]1277        if (dag_prepare_packet_real(libtrace, stream_data, packet, erfptr,
1278                                    TRACE_RT_DATA_ERF, flags))
[f0fb38f]1279                return -1;
[43c00e5]1280
[f0b87a7]1281        return packet->payload ? htons(erfptr->rlen) :
[d97778c]1282                erf_get_framing_length(packet);
[5e85c23]1283}
1284
[cb39d35]1285static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1286{
[5df059b]1287        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
[cb39d35]1288}
1289
1290static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1291                             libtrace_packet_t **packets, size_t nb_packets)
1292{
1293        int ret;
1294        size_t read_packets = 0;
1295        int numbytes = 0;
1296
1297        struct dag_per_stream_t *stream_data =
1298                (struct dag_per_stream_t *)t->format_data;
1299
1300        /* Read as many packets as we can, but read atleast one packet */
1301        do {
[5df059b]1302                ret = dag_read_packet_stream(libtrace, stream_data, t,
[cb39d35]1303                                           packets[read_packets]);
1304                if (ret < 0)
1305                        return ret;
1306
1307                read_packets++;
1308
1309                /* Make sure we don't read too many packets..! */
1310                if (read_packets >= nb_packets)
1311                        break;
1312
1313                numbytes = dag_available(libtrace, stream_data);
1314        } while (numbytes >= dag_record_size);
1315
1316        return read_packets;
1317}
1318
[43c00e5]1319/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1320 * packet is available, we will return a packet event. Otherwise we will
1321 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1322 */
[51d1f64]1323static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
[d97778c]1324                                           libtrace_packet_t *packet)
1325{
1326        libtrace_eventobj_t event = {0,0,0.0,0};
[5e85c23]1327        dag_record_t *erfptr = NULL;
[6033b99]1328        int numbytes;
[f0fb38f]1329        uint32_t flags = 0;
[c5ac872]1330        struct timeval minwait, tv;
[51d1f64]1331       
1332        minwait.tv_sec = 0;
1333        minwait.tv_usec = 10000;
[d97778c]1334
[c5ac872]1335        /* Check if we're meant to provide a DUCK update */
1336        numbytes = dag_get_duckinfo(libtrace, packet);
1337        if (numbytes < 0) {
1338                event.type = TRACE_EVENT_TERMINATE;
1339                return event;
1340        } else if (numbytes > 0) {
1341                event.type = TRACE_EVENT_PACKET;
1342                return event;
1343        }
[51d1f64]1344       
[92020b8]1345        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
[cb39d35]1346                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
[d97778c]1347                                &minwait) == -1) {
[51d1f64]1348                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1349                event.type = TRACE_EVENT_TERMINATE;
1350                return event;
1351        }
[f0b87a7]1352
[5778738]1353        do {
1354                erfptr = NULL;
1355                numbytes = 0;
[d97778c]1356
[5778738]1357                /* Need to call dag_available so that the top pointer will get
1358                 * updated, otherwise we'll never see any data! */
[cb39d35]1359                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
[5778738]1360
[d97778c]1361                /* May as well not bother calling dag_get_record if
[5778738]1362                 * dag_available suggests that there's no data */
1363                if (numbytes != 0)
[cb39d35]1364                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
[5778738]1365                if (erfptr == NULL) {
[43c00e5]1366                        /* No packet available - sleep for a very short time */
[51d1f64]1367                        if (libtrace_halt) {
1368                                event.type = TRACE_EVENT_TERMINATE;
[d97778c]1369                        } else {
[51d1f64]1370                                event.type = TRACE_EVENT_SLEEP;
1371                                event.seconds = 0.0001;
1372                        }
[5778738]1373                        break;
1374                }
[cb39d35]1375                if (dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
1376                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
[5778738]1377                        event.type = TRACE_EVENT_TERMINATE;
1378                        break;
1379                }
[f0b87a7]1380
1381
[d97778c]1382                event.size = trace_get_capture_length(packet) +
1383                        trace_get_framing_length(packet);
1384
[43c00e5]1385                /* XXX trace_read_packet() normally applies the following
1386                 * config options for us, but this function is called via
1387                 * trace_event() so we have to do it ourselves */
1388
[51d1f64]1389                if (libtrace->filter) {
[d97778c]1390                        int filtret = trace_apply_filter(libtrace->filter,
1391                                                         packet);
[51d1f64]1392                        if (filtret == -1) {
1393                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
[d97778c]1394                                              "Bad BPF Filter");
[51d1f64]1395                                event.type = TRACE_EVENT_TERMINATE;
1396                                break;
1397                        }
1398
1399                        if (filtret == 0) {
[5778738]1400                                /* This packet isn't useful so we want to
1401                                 * immediately see if there is another suitable
1402                                 * one - we definitely DO NOT want to return
1403                                 * a sleep event in this case, like we used to
1404                                 * do! */
[d97778c]1405                                libtrace->filtered_packets ++;
[d8e4595]1406                                trace_clear_cache(packet);
[5778738]1407                                continue;
1408                        }
[d97778c]1409
[51d1f64]1410                        event.type = TRACE_EVENT_PACKET;
[5e85c23]1411                } else {
[5778738]1412                        event.type = TRACE_EVENT_PACKET;
[5e85c23]1413                }
1414
[c5ac872]1415                /* Update the DUCK timer */
1416                tv = trace_get_timeval(packet);
1417                DUCK.last_pkt = tv.tv_sec;
1418               
[51d1f64]1419                if (libtrace->snaplen > 0) {
1420                        trace_set_capture_length(packet, libtrace->snaplen);
[5778738]1421                }
[d97778c]1422                libtrace->accepted_packets ++;
[5778738]1423                break;
[d97778c]1424        } while(1);
[5e85c23]1425
1426        return event;
1427}
1428
[5df059b]1429static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
[d97778c]1430{
[5df059b]1431        libtrace_list_node_t *tmp;
1432        assert(stat && libtrace);
1433        tmp = FORMAT_DATA_HEAD;
1434
1435        /* We don't filter packets on the card itself */
1436        stat->filtered_valid = 1;
1437        stat->filtered = 0;
[c66a465]1438
[5df059b]1439        /* Dropped, filtered the  */
1440        stat->dropped_valid = 1;
1441        stat->dropped = 0;
[cb39d35]1442        while (tmp != NULL) {
[5df059b]1443                stat->dropped += STREAM_DATA(tmp)->drops;
[cb39d35]1444                tmp = tmp->next;
[c66a465]1445        }
1446
[5df059b]1447}
1448
1449static void dag_get_thread_statisitics(libtrace_t *libtrace, libtrace_thread_t *t,
1450                                       libtrace_stat_t *stat) {
1451        struct dag_per_stream_t *stream_data = t->format_data;
1452        assert(stat && libtrace);
1453
1454        stat->dropped_valid = 1;
1455        stat->dropped = stream_data->drops;
1456
1457        stat->filtered_valid = 1;
1458        stat->filtered = 0;
[2b7750a]1459}
[5e85c23]1460
[43c00e5]1461/* Prints some semi-useful help text about the DAG format module */
[5e85c23]1462static void dag_help(void) {
[d97778c]1463        printf("dag format module: $Revision: 1755 $\n");
1464        printf("Supported input URIs:\n");
1465        printf("\tdag:/dev/dagn\n");
1466        printf("\n");
1467        printf("\te.g.: dag:/dev/dag0\n");
1468        printf("\n");
1469        printf("Supported output URIs:\n");
1470        printf("\tnone\n");
1471        printf("\n");
[5e85c23]1472}
1473
[cb39d35]1474static int dag_pconfig_input(UNUSED libtrace_t *libtrace,
1475                             trace_parallel_option_t option, UNUSED void *value)
[d97778c]1476{
[9149564]1477        /* We don't support any of these! Normally you configure the DAG card
1478         * externally. */
1479        switch(option) {
1480        case TRACE_OPTION_SET_HASHER:
1481        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1482        case TRACE_OPTION_TRACETIME:
1483        case TRACE_OPTION_TICK_INTERVAL:
1484        case TRACE_OPTION_GET_CONFIG:
1485        case TRACE_OPTION_SET_CONFIG:
1486                return -1;
1487        }
1488        /* We don't provide a default option to ensure that future options will
1489         * generate a compiler warning. */
[fe11d12]1490
1491        return -1;
1492}
1493
1494static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
[d97778c]1495                                bool reader)
1496{
[cb39d35]1497        struct dag_per_stream_t *stream_data;
[5df059b]1498        libtrace_list_node_t *node;
[9149564]1499
[fe11d12]1500        if (reader) {
1501                if (t->type == THREAD_PERPKT) {
[5df059b]1502
1503                        node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1504                                                        t->perpkt_num);
1505                        if (node == NULL) {
1506                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1507                                              "Too few streams supplied for the"
1508                                              " number of threads lanuched");
1509                                return -1;
1510                        }
1511                        stream_data = node->data;
[cb39d35]1512
[fe11d12]1513                        /* Pass the per thread data to the thread */
[cb39d35]1514                        t->format_data = stream_data;
[fe11d12]1515
1516                        /* Attach and start the DAG stream */
[d97778c]1517                        printf("t%u: starting and attaching stream #%u\n",
[cb39d35]1518                               t->perpkt_num, stream_data->dagstream);
[92020b8]1519                        if (dag_start_input_stream(libtrace, stream_data) < 0)
[9149564]1520                                return -1;
[fe11d12]1521                } else {
[cb39d35]1522                        /* TODO: Figure out why t->type != THREAD_PERPKT in
1523                         * order to figure out what this line does */
1524                        t->format_data = FORMAT_DATA_FIRST;
[fe11d12]1525                }
1526        }
1527
[9149564]1528        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
[fe11d12]1529
1530        return 0;
1531}
1532
[5e85c23]1533static struct libtrace_format_t dag = {
[d97778c]1534        "dag",
1535        "$Id$",
1536        TRACE_FORMAT_ERF,
[91b72d3]1537        dag_probe_filename,             /* probe filename */
1538        NULL,                           /* probe magic */
[d97778c]1539        dag_init_input,                 /* init_input */
1540        dag_config_input,               /* config_input */
1541        dag_start_input,                /* start_input */
1542        dag_pause_input,                /* pause_input */
[35c5a72]1543        dag_init_output,                /* init_output */
[d97778c]1544        NULL,                           /* config_output */
[35c5a72]1545        dag_start_output,               /* start_output */
[d97778c]1546        dag_fin_input,                  /* fin_input */
[35c5a72]1547        dag_fin_output,                 /* fin_output */
[d97778c]1548        dag_read_packet,                /* read_packet */
1549        dag_prepare_packet,             /* prepare_packet */
[f0fb38f]1550        NULL,                           /* fin_packet */
[35c5a72]1551        dag_write_packet,               /* write_packet */
[d97778c]1552        erf_get_link_type,              /* get_link_type */
1553        erf_get_direction,              /* get_direction */
1554        erf_set_direction,              /* set_direction */
1555        erf_get_erf_timestamp,          /* get_erf_timestamp */
1556        NULL,                           /* get_timeval */
1557        NULL,                           /* get_seconds */
[1aa4bf7]1558        NULL,                           /* get_timespec */
[d97778c]1559        NULL,                           /* seek_erf */
1560        NULL,                           /* seek_timeval */
1561        NULL,                           /* seek_seconds */
1562        erf_get_capture_length,         /* get_capture_length */
1563        erf_get_wire_length,            /* get_wire_length */
1564        erf_get_framing_length,         /* get_framing_length */
1565        erf_set_capture_length,         /* set_capture_length */
[f2fae49]1566        NULL,                           /* get_received_packets */
1567        NULL,                           /* get_filtered_packets */
[5df059b]1568        NULL,                           /* get_dropped_packets */
1569        dag_get_statistics,             /* get_statistics */
[d97778c]1570        NULL,                           /* get_fd */
1571        trace_event_dag,                /* trace_event */
1572        dag_help,                       /* help */
1573        NULL,                            /* next pointer */
[cb39d35]1574        {true, 0}, /* live packet capture, thread limit TBD */
1575        dag_pstart_input,
1576        dag_pread_packets,
1577        dag_pause_input,
1578        NULL,
1579        dag_pconfig_input,
1580        dag_pregister_thread,
[5ab626a]1581        NULL,
[5df059b]1582        dag_get_thread_statisitics      /* get thread stats */
[5e85c23]1583};
1584
[d97778c]1585void dag_constructor(void)
1586{
[5e85c23]1587        register_format(&dag);
1588}
Note: See TracBrowser for help on using the repository browser.