source: lib/format_dag25.c @ 18bf317

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 18bf317 was 18bf317, checked in by Dan Collins <dan@…>, 6 years ago

Merge branch 'develop' of github.com:wanduow/libtrace into develop

  • Property mode set to 100644
File size: 46.4 KB
RevLine 
[5e85c23]1/*
2 * This file is part of libtrace
3 *
[43c00e5]4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
[5e85c23]6 *
[43c00e5]7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
[5e85c23]11 * All rights reserved.
12 *
[43c00e5]13 * This code has been developed by the University of Waikato WAND
[5e85c23]14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
[293999b]30 * $Id$
[5e85c23]31 *
32 */
[43c00e5]33
[5e85c23]34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
[44028d4]49#include <sys/stat.h>
[5e85c23]50
51#include <sys/mman.h>
[2faa57e]52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
[5e85c23]55
[121b7e2]56
[5e85c23]57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
[43c00e5]70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
[121b7e2]81
[5e85c23]82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
[f0b87a7]83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
[fe11d12]84#define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
[35c5a72]85
86#define FORMAT_DATA DATA(libtrace)
[f0b87a7]87#define FORMAT_DATA_OUT DATA_OUT(libtrace)
88
[5e85c23]89#define DUCK FORMAT_DATA->duck
90static struct libtrace_format_t dag;
91
[43c00e5]92/* A DAG device - a DAG device can support multiple streams (and therefore
93 * multiple input traces) so each trace needs to refer to a device */
[121b7e2]94struct dag_dev_t {
[43c00e5]95        char * dev_name;                /* Device name */
96        int fd;                         /* File descriptor */
97        uint16_t ref_count;             /* Number of input / output traces
98                                           that are using this device */
99        struct dag_dev_t *prev;         /* Pointer to the previous device in
100                                           the device list */
101        struct dag_dev_t *next;         /* Pointer to the next device in the
102                                           device list */
[f0b87a7]103};
104
[43c00e5]105/* "Global" data that is stored for each DAG output trace */
[f0b87a7]106struct dag_format_data_out_t {
[43c00e5]107        /* The DAG device being used for writing */
[f0b87a7]108        struct dag_dev_t *device;
[43c00e5]109        /* The DAG stream that is being written on */
[f0b87a7]110        unsigned int dagstream;
[43c00e5]111        /* Boolean flag indicating whether the stream is currently attached */
[f0b87a7]112        int stream_attached;
[43c00e5]113        /* The amount of data waiting to be transmitted, in bytes */
[5865c72]114        uint64_t waiting;
[43c00e5]115        /* A buffer to hold the data to be transmittted */
[5865c72]116        uint8_t *txbuffer;
[f0b87a7]117};
[121b7e2]118
[fe11d12]119/* Data that is stored for each libtrace_thread_t */
120struct dag_per_thread_t {
[d97778c]121        /* DAG device */
122        struct dag_dev_t *device;
123        /* Stream number */
124        uint16_t stream;
125        /* Pointer to the last unread byte in the DAG memory */
126        uint8_t *top;
127        /* Pointer to the first unread byte in the DAG memory */
128        uint8_t *bottom;
129        /* Amount of data processed from the bottom pointer */
130        uint32_t processed;
131        /* Number of packets seen by the thread */
132        uint64_t pkt_count;
133        /* Drop count for this particular thread */
[c66a465]134        uint64_t drops;
[fe11d12]135};
136
[43c00e5]137/* "Global" data that is stored for each DAG input trace */
[5e85c23]138struct dag_format_data_t {
[43c00e5]139        /* Data required for regular DUCK reporting */
[5e85c23]140        struct {
[43c00e5]141                /* Timestamp of the last DUCK report */
[5e85c23]142                uint32_t last_duck;
[43c00e5]143                /* The number of seconds between each DUCK report */
[d97778c]144                uint32_t duck_freq;
[43c00e5]145                /* Timestamp of the last packet read from the DAG card */
[d97778c]146                uint32_t last_pkt;
[43c00e5]147                /* Dummy trace to ensure DUCK packets are dealt with using the
148                 * DUCK format functions */
[d97778c]149                libtrace_t *dummy_duck;
150        } duck;
[5e85c23]151
[43c00e5]152        /* The DAG device that we are reading from */
[121b7e2]153        struct dag_dev_t *device;
[43c00e5]154        /* The DAG stream that we are reading from */
[5e85c23]155        unsigned int dagstream;
[43c00e5]156        /* Boolean flag indicating whether the stream is currently attached */
[52656ed]157        int stream_attached;
[43c00e5]158        /* Pointer to the first unread byte in the DAG memory hole */
[52656ed]159        uint8_t *bottom;
[43c00e5]160        /* Pointer to the last unread byte in the DAG memory hole */
[52656ed]161        uint8_t *top;
[43c00e5]162        /* The amount of data processed thus far from the bottom pointer */
[5798dc6]163        uint32_t processed;
[43c00e5]164        /* The number of packets that have been dropped */
[50bbce8]165        uint64_t drops;
[d97778c]166        /* When running in parallel mode this is malloc'd with an
167         * array of thread structures. Most of the stuff above doesn't
168         * get used in parallel mode. */
[fe11d12]169        struct dag_per_thread_t *per_thread;
[70bf39a]170
171        uint8_t seeninterface[4];
[5e85c23]172};
173
[43c00e5]174/* To be thread-safe, we're going to need a mutex for operating on the list
175 * of DAG devices */
[121b7e2]176pthread_mutex_t open_dag_mutex;
[43c00e5]177
178/* The list of DAG devices that have been opened by libtrace.
179 *
180 * We can only open each DAG device once, but we might want to read from
181 * multiple streams. Therefore, we need to maintain a list of devices that we
182 * have opened (with ref counts!) so that we don't try to open a device too
183 * many times or close a device that we're still using */
[121b7e2]184struct dag_dev_t *open_dags = NULL;
[5e85c23]185
[43c00e5]186/* Returns the amount of padding between the ERF header and the start of the
187 * captured packet data */
[5865c72]188static int dag_get_padding(const libtrace_packet_t *packet)
189{
[43c00e5]190        /* ERF Ethernet records have a 2 byte padding before the packet itself
191         * so that the IP header is aligned on a 32 bit boundary.
192         */
[5865c72]193        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
194                dag_record_t *erfptr = (dag_record_t *)packet->header;
195                switch(erfptr->type) {
196                        case TYPE_ETH:
197                        case TYPE_DSM_COLOR_ETH:
198                                return 2;
199                        default:                return 0;
200                }
201        }
202        else {
203                switch(trace_get_link_type(packet)) {
204                        case TRACE_TYPE_ETH:    return 2;
205                        default:                return 0;
206                }
207        }
208}
209
[43c00e5]210/* Attempts to determine if the given filename refers to a DAG device */
[35c5a72]211static int dag_probe_filename(const char *filename)
[91b72d3]212{
213        struct stat statbuf;
214        /* Can we stat the file? */
215        if (stat(filename, &statbuf) != 0) {
216                return 0;
217        }
218        /* Is it a character device? */
219        if (!S_ISCHR(statbuf.st_mode)) {
220                return 0;
221        }
222        /* Yeah, it's probably us. */
223        return 1;
224}
225
[43c00e5]226/* Initialises the DAG output data structure */
[d97778c]227static void dag_init_format_out_data(libtrace_out_t *libtrace)
228{
229        libtrace->format_data = (struct dag_format_data_out_t *)
230                malloc(sizeof(struct dag_format_data_out_t));
[f0b87a7]231        // no DUCK on output
232        FORMAT_DATA_OUT->stream_attached = 0;
233        FORMAT_DATA_OUT->device = NULL;
234        FORMAT_DATA_OUT->dagstream = 0;
[5865c72]235        FORMAT_DATA_OUT->waiting = 0;
[f0b87a7]236
237}
238
[43c00e5]239/* Initialises the DAG input data structure */
[d97778c]240static void dag_init_format_data(libtrace_t *libtrace)
241{
[f0fb38f]242        libtrace->format_data = (struct dag_format_data_t *)
[f0b87a7]243                malloc(sizeof(struct dag_format_data_t));
[f0fb38f]244        DUCK.last_duck = 0;
[d97778c]245        DUCK.duck_freq = 0;
246        DUCK.last_pkt = 0;
247        DUCK.dummy_duck = NULL;
[f0fb38f]248        FORMAT_DATA->stream_attached = 0;
249        FORMAT_DATA->drops = 0;
250        FORMAT_DATA->device = NULL;
251        FORMAT_DATA->dagstream = 0;
252        FORMAT_DATA->processed = 0;
253        FORMAT_DATA->bottom = NULL;
254        FORMAT_DATA->top = NULL;
[d97778c]255        memset(FORMAT_DATA->seeninterface, 0,
256               sizeof(FORMAT_DATA->seeninterface));
[f0fb38f]257}
258
[43c00e5]259/* Determines if there is already an entry for the given DAG device in the
260 * device list and increments the reference count for that device, if found.
261 *
262 * NOTE: This function assumes the open_dag_mutex is held by the caller */
[d97778c]263static struct dag_dev_t *dag_find_open_device(char *dev_name)
264{
[121b7e2]265        struct dag_dev_t *dag_dev;
[f0b87a7]266
[121b7e2]267        dag_dev = open_dags;
268
269        /* XXX: Not exactly zippy, but how often are we going to be dealing
270         * with multiple dag cards? */
271        while (dag_dev != NULL) {
272                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
[a9a91d1]273                        dag_dev->ref_count ++;
[121b7e2]274                        return dag_dev;
275                }
276                dag_dev = dag_dev->next;
277        }
278        return NULL;
279}
[5e85c23]280
[43c00e5]281/* Closes a DAG device and removes it from the device list.
282 *
283 * Attempting to close a DAG device that has a non-zero reference count will
284 * cause an assertion failure!
285 *
286 * NOTE: This function assumes the open_dag_mutex is held by the caller */
[d97778c]287static void dag_close_device(struct dag_dev_t *dev)
288{
[121b7e2]289        /* Need to remove from the device list */
[a9a91d1]290        assert(dev->ref_count == 0);
[f0b87a7]291
[a9a91d1]292        if (dev->prev == NULL) {
293                open_dags = dev->next;
294                if (dev->next)
295                        dev->next->prev = NULL;
296        } else {
297                dev->prev->next = dev->next;
298                if (dev->next)
299                        dev->next->prev = dev->prev;
[121b7e2]300        }
301
[a9a91d1]302        dag_close(dev->fd);
[f0b87a7]303        if (dev->dev_name)
[70bf39a]304                free(dev->dev_name);
[a9a91d1]305        free(dev);
[f0b87a7]306}
307
[43c00e5]308
309/* Opens a new DAG device for writing and adds it to the DAG device list
310 *
311 * NOTE: this function should only be called when opening a DAG device for
[d97778c]312 * writing - there is little practical difference between this and the
[43c00e5]313 * function below that covers the reading case, but we need the output trace
[d97778c]314 * object to report errors properly so the two functions take slightly
[43c00e5]315 * different arguments. This is really lame and there should be a much better
316 * way of doing this.
317 *
[d97778c]318 * NOTE: This function assumes the open_dag_mutex is held by the caller
[43c00e5]319 */
[d97778c]320static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
321                                                char *dev_name)
322{
[f0b87a7]323        struct stat buf;
324        int fd;
325        struct dag_dev_t *new_dev;
326
[43c00e5]327        /* Make sure the device exists */
[f0b87a7]328        if (stat(dev_name, &buf) == -1) {
329                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
330                return NULL;
[d97778c]331        }
[f0b87a7]332
[43c00e5]333        /* Make sure it is the appropriate type of device */
[f0b87a7]334        if (S_ISCHR(buf.st_mode)) {
[43c00e5]335                /* Try opening the DAG device */
[f0b87a7]336                if((fd = dag_open(dev_name)) < 0) {
337                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
338                                        dev_name);
339                        return NULL;
340                }
341        } else {
342                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
343                                dev_name);
344                return NULL;
345        }
346
[43c00e5]347        /* Add the device to our device list - it is just a doubly linked
348         * list with no inherent ordering; just tack the new one on the front
349         */
[f0b87a7]350        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
351        new_dev->fd = fd;
352        new_dev->dev_name = dev_name;
353        new_dev->ref_count = 1;
354
355        new_dev->prev = NULL;
356        new_dev->next = open_dags;
357        if (open_dags)
358                open_dags->prev = new_dev;
359
360        open_dags = new_dev;
361
362        return new_dev;
[121b7e2]363}
364
[43c00e5]365/* Opens a new DAG device for reading and adds it to the DAG device list
366 *
367 * NOTE: this function should only be called when opening a DAG device for
[d97778c]368 * reading - there is little practical difference between this and the
[43c00e5]369 * function above that covers the writing case, but we need the input trace
[d97778c]370 * object to report errors properly so the two functions take slightly
[43c00e5]371 * different arguments. This is really lame and there should be a much better
372 * way of doing this.
373 *
374 * NOTE: This function assumes the open_dag_mutex is held by the caller */
[121b7e2]375static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
376        struct stat buf;
377        int fd;
378        struct dag_dev_t *new_dev;
[f0b87a7]379
[43c00e5]380        /* Make sure the device exists */
[d97778c]381        if (stat(dev_name, &buf) == -1) {
382                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
383                return NULL;
384        }
[f0b87a7]385
[43c00e5]386        /* Make sure it is the appropriate type of device */
[5e85c23]387        if (S_ISCHR(buf.st_mode)) {
[43c00e5]388                /* Try opening the DAG device */
[121b7e2]389                if((fd = dag_open(dev_name)) < 0) {
[d97778c]390                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
391                                      dev_name);
392                        return NULL;
393                }
[5e85c23]394        } else {
395                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
[d97778c]396                              dev_name);
397                return NULL;
398        }
[f0b87a7]399
[43c00e5]400        /* Add the device to our device list - it is just a doubly linked
401         * list with no inherent ordering; just tack the new one on the front
402         */
[121b7e2]403        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
404        new_dev->fd = fd;
405        new_dev->dev_name = dev_name;
[a9a91d1]406        new_dev->ref_count = 1;
[f0b87a7]407
[a9a91d1]408        new_dev->prev = NULL;
[121b7e2]409        new_dev->next = open_dags;
[a9a91d1]410        if (open_dags)
411                open_dags->prev = new_dev;
[f0b87a7]412
[121b7e2]413        open_dags = new_dev;
[f0b87a7]414
[121b7e2]415        return new_dev;
416}
[f0b87a7]417
[43c00e5]418/* Creates and initialises a DAG output trace */
[d97778c]419static int dag_init_output(libtrace_out_t *libtrace)
420{
[f0b87a7]421        char *dag_dev_name = NULL;
422        char *scan = NULL;
423        struct dag_dev_t *dag_device = NULL;
424        int stream = 1;
[d97778c]425
[cce868c]426        /* XXX I don't know if this is important or not, but this function
427         * isn't present in all of the driver releases that this code is
428         * supposed to support! */
429        /*
[5865c72]430        unsigned long wake_time;
431        dagutil_sleep_get_wake_time(&wake_time,0);
[cce868c]432        */
[f0b87a7]433
434        dag_init_format_out_data(libtrace);
[d97778c]435        /* Grab the mutex while we're likely to be messing with the device
[43c00e5]436         * list */
[f0b87a7]437        pthread_mutex_lock(&open_dag_mutex);
[d97778c]438
[43c00e5]439        /* Specific streams are signified using a comma in the libtrace URI,
440         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
441         *
442         * If no stream is specified, we will write using stream 1 */
[f0b87a7]443        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
444                dag_dev_name = strdup(libtrace->uridata);
445        } else {
446                dag_dev_name = (char *)strndup(libtrace->uridata,
447                                (size_t)(scan - libtrace->uridata));
448                stream = atoi(++scan);
449        }
[35c5a72]450        FORMAT_DATA_OUT->dagstream = stream;
[f0b87a7]451
[43c00e5]452        /* See if our DAG device is already open */
[f0b87a7]453        dag_device = dag_find_open_device(dag_dev_name);
454
455        if (dag_device == NULL) {
456                /* Device not yet opened - open it ourselves */
457                dag_device = dag_open_output_device(libtrace, dag_dev_name);
458        } else {
[43c00e5]459                /* Otherwise, just use the existing one */
[f0b87a7]460                free(dag_dev_name);
461                dag_dev_name = NULL;
462        }
463
[43c00e5]464        /* Make sure we have successfully opened a DAG device */
[f0b87a7]465        if (dag_device == NULL) {
[35c5a72]466                if (dag_dev_name) {
[f0b87a7]467                        free(dag_dev_name);
[35c5a72]468                }
[43c00e5]469                pthread_mutex_unlock(&open_dag_mutex);
[f0b87a7]470                return -1;
471        }
472
[35c5a72]473        FORMAT_DATA_OUT->device = dag_device;
[f0b87a7]474        pthread_mutex_unlock(&open_dag_mutex);
475        return 0;
476}
[121b7e2]477
[43c00e5]478/* Creates and initialises a DAG input trace */
[121b7e2]479static int dag_init_input(libtrace_t *libtrace) {
[d97778c]480        char *dag_dev_name = NULL;
[121b7e2]481        char *scan = NULL;
[fe11d12]482        int stream = 0, thread_count = 1;
[121b7e2]483        struct dag_dev_t *dag_device = NULL;
[f0b87a7]484
[f0fb38f]485        dag_init_format_data(libtrace);
[d97778c]486        /* Grab the mutex while we're likely to be messing with the device
[43c00e5]487         * list */
[a9a91d1]488        pthread_mutex_lock(&open_dag_mutex);
[fe11d12]489
490
491        /* DAG cards support multiple streams. In a single threaded capture,
492         * these are specified using a comma in the libtrace URI,
[43c00e5]493         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
494         *
[d97778c]495         * If no stream is specified, we will read from stream 0 with
496         * one thread
[fe11d12]497         */
498        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
[121b7e2]499                dag_dev_name = strdup(libtrace->uridata);
500        } else {
501                dag_dev_name = (char *)strndup(libtrace->uridata,
[d97778c]502                                               (size_t)(scan -
503                                                        libtrace->uridata));
[121b7e2]504                stream = atoi(++scan);
505        }
506
507        FORMAT_DATA->dagstream = stream;
508
[43c00e5]509        /* See if our DAG device is already open */
[121b7e2]510        dag_device = dag_find_open_device(dag_dev_name);
511
512        if (dag_device == NULL) {
513                /* Device not yet opened - open it ourselves */
514                dag_device = dag_open_device(libtrace, dag_dev_name);
515        } else {
[43c00e5]516                /* Otherwise, just use the existing one */
[121b7e2]517                free(dag_dev_name);
518                dag_dev_name = NULL;
519        }
520
[43c00e5]521        /* Make sure we have successfully opened a DAG device */
[121b7e2]522        if (dag_device == NULL) {
523                if (dag_dev_name)
524                        free(dag_dev_name);
[f0b87a7]525                dag_dev_name = NULL;
[43c00e5]526                pthread_mutex_unlock(&open_dag_mutex);
[121b7e2]527                return -1;
528        }
529
530        FORMAT_DATA->device = dag_device;
[f0b87a7]531
[d97778c]532        /* See Config_Status_API_Programming_Guide.pdf from the Endace
533           Dag Documentation */
534        /* Check kBooleanAttributeActive is true -- no point capturing
535         * on an interface that's disabled
536         *
537         * The symptom of the port being disabled is that libtrace
538         * will appear to hang. */
[81c2da8]539        /* Check kBooleanAttributeFault is false */
540        /* Check kBooleanAttributeLocalFault is false */
541        /* Check kBooleanAttributeLock is true ? */
542        /* Check kBooleanAttributePeerLink ? */
543
[d97778c]544        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
545           on libtrace promisc attribute?*/
[81c2da8]546        /* Set kUint32AttributeSnapLength to the snaplength */
547
[a9a91d1]548        pthread_mutex_unlock(&open_dag_mutex);
[d97778c]549        return 0;
[5e85c23]550}
[f0b87a7]551
[43c00e5]552/* Configures a DAG input trace */
[5e85c23]553static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
[d97778c]554                            void *data)
555{
556        char conf_str[4096];
[52656ed]557        switch(option) {
[d97778c]558        case TRACE_OPTION_META_FREQ:
559                /* This option is used to specify the frequency of DUCK
560                 * updates */
561                DUCK.duck_freq = *(int *)data;
562                return 0;
563        case TRACE_OPTION_SNAPLEN:
564                /* Tell the card our new snap length */
565                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
566                if (dag_configure(FORMAT_DATA->device->fd,
567                                  conf_str) != 0) {
568                        trace_set_err(libtrace, errno, "Failed to configure "
569                                      "snaplen on DAG card: %s",
570                                      libtrace->uridata);
[708f9ae]571                        return -1;
[d97778c]572                }
573                return 0;
574        case TRACE_OPTION_PROMISC:
575                /* DAG already operates in a promisc fashion */
576                return -1;
577        case TRACE_OPTION_FILTER:
578                /* We don't yet support pushing filters into DAG
579                 * cards */
580                return -1;
581        case TRACE_OPTION_EVENT_REALTIME:
582                /* Live capture is always going to be realtime */
583                return -1;
584        }
[708f9ae]585        return -1;
[5e85c23]586}
[43c00e5]587
588/* Starts a DAG output trace */
[d97778c]589static int dag_start_output(libtrace_out_t *libtrace)
590{
[f0b87a7]591        struct timeval zero, nopoll;
592
593        zero.tv_sec = 0;
594        zero.tv_usec = 0;
595        nopoll = zero;
596
[43c00e5]597        /* Attach and start the DAG stream */
[35c5a72]598        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
[70bf39a]599                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
[f0b87a7]600                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
601                return -1;
602        }
603
[35c5a72]604        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
605                        FORMAT_DATA_OUT->dagstream) < 0) {
[f0b87a7]606                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
607                return -1;
608        }
[35c5a72]609        FORMAT_DATA_OUT->stream_attached = 1;
[f0b87a7]610
611        /* We don't want the dag card to do any sleeping */
[5865c72]612        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
613                        FORMAT_DATA_OUT->dagstream, 0, &zero,
[f0b87a7]614                        &nopoll);
[5865c72]615
[f0b87a7]616        return 0;
617}
[5e85c23]618
[43c00e5]619/* Starts a DAG input trace */
[d97778c]620static int dag_start_input(libtrace_t *libtrace)
621{
622        struct timeval zero, nopoll;
623        uint8_t *top, *bottom, *starttop;
[70bf39a]624        uint64_t diff = 0;
[5e85c23]625        top = bottom = NULL;
626
627        zero.tv_sec = 0;
[d97778c]628        zero.tv_usec = 10000;
629        nopoll = zero;
[5e85c23]630
[43c00e5]631        /* Attach and start the DAG stream */
[f0b87a7]632        if (dag_attach_stream(FORMAT_DATA->device->fd,
[d97778c]633                              FORMAT_DATA->dagstream, 0, 0) < 0) {
634                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
635                return -1;
636        }
[5e85c23]637
[f0b87a7]638        if (dag_start_stream(FORMAT_DATA->device->fd,
[d97778c]639                             FORMAT_DATA->dagstream) < 0) {
640                trace_set_err(libtrace, errno, "Cannot start DAG stream");
641                return -1;
642        }
[52656ed]643        FORMAT_DATA->stream_attached = 1;
[d97778c]644
[5e85c23]645        /* We don't want the dag card to do any sleeping */
[d97778c]646        dag_set_stream_poll(FORMAT_DATA->device->fd,
647                            FORMAT_DATA->dagstream, 0, &zero,
648                            &nopoll);
[f0b87a7]649
[70bf39a]650        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
[d97778c]651                                      FORMAT_DATA->dagstream,
652                                      &bottom);
[70bf39a]653
[5e85c23]654        /* Should probably flush the memory hole now */
[68d3308]655        top = starttop;
656        while (starttop - bottom > 0) {
[70bf39a]657                bottom += (starttop - bottom);
[121b7e2]658                top = dag_advance_stream(FORMAT_DATA->device->fd,
[d97778c]659                                         FORMAT_DATA->dagstream,
660                                         &bottom);
[70bf39a]661        }
662        FORMAT_DATA->top = top;
663        FORMAT_DATA->bottom = bottom;
[5798dc6]664        FORMAT_DATA->processed = 0;
[50bbce8]665        FORMAT_DATA->drops = 0;
[f0b87a7]666
667        return 0;
668}
669
[43c00e5]670/* Pauses a DAG output trace */
[d97778c]671static int dag_pause_output(libtrace_out_t *libtrace)
672{
[43c00e5]673        /* Stop and detach the stream */
[35c5a72]674        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
[d97778c]675                            FORMAT_DATA_OUT->dagstream) < 0) {
[f0b87a7]676                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
677                return -1;
678        }
[35c5a72]679        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
[d97778c]680                              FORMAT_DATA_OUT->dagstream) < 0) {
681                trace_set_err_out(libtrace, errno,
682                                  "Could not detach DAG stream");
[f0b87a7]683                return -1;
684        }
[35c5a72]685        FORMAT_DATA_OUT->stream_attached = 0;
[5e85c23]686        return 0;
687}
688
[43c00e5]689/* Pauses a DAG input trace */
[d97778c]690static int dag_pause_input(libtrace_t *libtrace)
691{
[43c00e5]692        /* Stop and detach the stream */
[f0b87a7]693        if (dag_stop_stream(FORMAT_DATA->device->fd,
[d97778c]694                            FORMAT_DATA->dagstream) < 0) {
695                trace_set_err(libtrace, errno, "Could not stop DAG stream");
696                return -1;
697        }
698        if (dag_detach_stream(FORMAT_DATA->device->fd,
699                              FORMAT_DATA->dagstream) < 0) {
700                trace_set_err(libtrace, errno, "Could not detach DAG stream");
701                return -1;
[5e85c23]702        }
[52656ed]703        FORMAT_DATA->stream_attached = 0;
[5e85c23]704        return 0;
705}
706
[43c00e5]707/* Closes a DAG input trace */
[d97778c]708static int dag_fin_input(libtrace_t *libtrace)
709{
[43c00e5]710        /* Need the lock, since we're going to be handling the device list */
[a9a91d1]711        pthread_mutex_lock(&open_dag_mutex);
[d97778c]712
[43c00e5]713        /* Detach the stream if we are not paused */
[52656ed]714        if (FORMAT_DATA->stream_attached)
715                dag_pause_input(libtrace);
[121b7e2]716        FORMAT_DATA->device->ref_count --;
[f0b87a7]717
[43c00e5]718        /* Close the DAG device if there are no more references to it */
[121b7e2]719        if (FORMAT_DATA->device->ref_count == 0)
720                dag_close_device(FORMAT_DATA->device);
[5e85c23]721        if (DUCK.dummy_duck)
722                trace_destroy_dead(DUCK.dummy_duck);
723        free(libtrace->format_data);
[a9a91d1]724        pthread_mutex_unlock(&open_dag_mutex);
[d97778c]725        return 0; /* success */
[5e85c23]726}
727
[43c00e5]728/* Closes a DAG output trace */
[d97778c]729static int dag_fin_output(libtrace_out_t *libtrace)
730{
731
[43c00e5]732        /* Commit any outstanding traffic in the txbuffer */
[5865c72]733        if (FORMAT_DATA_OUT->waiting) {
[d97778c]734                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
735                                           FORMAT_DATA_OUT->dagstream,
736                                           FORMAT_DATA_OUT->waiting );
[5865c72]737        }
738
[d97778c]739        /* Wait until the buffer is nearly clear before exiting the program,
[43c00e5]740         * as we will lose packets otherwise */
[d97778c]741        dag_tx_get_stream_space
742                (FORMAT_DATA_OUT->device->fd,
743                 FORMAT_DATA_OUT->dagstream,
744                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
745                                            FORMAT_DATA_OUT->dagstream) - 8);
[5865c72]746
[43c00e5]747        /* Need the lock, since we're going to be handling the device list */
[f0b87a7]748        pthread_mutex_lock(&open_dag_mutex);
[43c00e5]749
750        /* Detach the stream if we are not paused */
[35c5a72]751        if (FORMAT_DATA_OUT->stream_attached)
[f0b87a7]752                dag_pause_output(libtrace);
[35c5a72]753        FORMAT_DATA_OUT->device->ref_count --;
[f0b87a7]754
[43c00e5]755        /* Close the DAG device if there are no more references to it */
[35c5a72]756        if (FORMAT_DATA_OUT->device->ref_count == 0)
757                dag_close_device(FORMAT_DATA_OUT->device);
[f0b87a7]758        free(libtrace->format_data);
759        pthread_mutex_unlock(&open_dag_mutex);
760        return 0; /* success */
761}
762
[43c00e5]763/* Extracts DUCK information from the DAG card and produces a DUCK packet */
[d97778c]764static int dag_get_duckinfo(libtrace_t *libtrace, libtrace_packet_t *packet)
765{
[43c00e5]766        /* Allocate memory for the DUCK data */
[d97778c]767        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
768            !packet->buffer) {
769                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
770                packet->buf_control = TRACE_CTRL_PACKET;
771                if (!packet->buffer) {
772                        trace_set_err(libtrace, errno,
773                                      "Cannot allocate packet buffer");
774                        return -1;
775                }
776        }
[5e85c23]777
[43c00e5]778        /* DUCK doesn't have a format header */
[d97778c]779        packet->header = 0;
780        packet->payload = packet->buffer;
781
782        /* No need to check if we can get DUCK or not - we're modern
783         * enough so just grab the DUCK info */
784        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
785                   (duckinf_t *)packet->payload) < 0)) {
786                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
787                return -1;
788        }
[5e85c23]789
[d97778c]790        packet->type = TRACE_RT_DUCK_2_5;
[43c00e5]791
792        /* Set the packet's tracce to point at a DUCK trace, so that the
793         * DUCK format functions will be called on the packet rather than the
794         * DAG ones */
[d97778c]795        if (!DUCK.dummy_duck)
796                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
797        packet->trace = DUCK.dummy_duck;
798        return sizeof(duckinf_t);
[5e85c23]799}
800
[43c00e5]801/* Determines the amount of data available to read from the DAG card */
[d97778c]802static int dag_available(libtrace_t *libtrace)
803{
[52656ed]804        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
[43c00e5]805
[5798dc6]806        /* If we've processed more than 4MB of data since we last called
[f0b87a7]807         * dag_advance_stream, then we should call it again to allow the
[5798dc6]808         * space occupied by that 4MB to be released */
809        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
[52656ed]810                return diff;
[d97778c]811
[43c00e5]812        /* Update the top and bottom pointers */
[f0b87a7]813        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
[d97778c]814                                              FORMAT_DATA->dagstream,
815                                              &(FORMAT_DATA->bottom));
816
[52656ed]817        if (FORMAT_DATA->top == NULL) {
818                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
819                return -1;
820        }
[5798dc6]821        FORMAT_DATA->processed = 0;
[52656ed]822        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
823        return diff;
824}
825
[43c00e5]826/* Returns a pointer to the start of the next complete ERF record */
[d97778c]827static dag_record_t *dag_get_record(libtrace_t *libtrace)
828{
829        dag_record_t *erfptr = NULL;
830        uint16_t size;
831
[52656ed]832        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
[5e85c23]833        if (!erfptr)
[d97778c]834                return NULL;
835
836        size = ntohs(erfptr->rlen);
837        assert( size >= dag_record_size );
838
[52656ed]839        /* Make certain we have the full packet available */
840        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
841                return NULL;
[d97778c]842
[52656ed]843        FORMAT_DATA->bottom += size;
[5798dc6]844        FORMAT_DATA->processed += size;
[5e85c23]845        return erfptr;
846}
847
[43c00e5]848/* Converts a buffer containing a recently read DAG packet record into a
849 * libtrace packet */
[f0fb38f]850static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
[d97778c]851                              void *buffer, libtrace_rt_types_t rt_type,
852                              uint32_t flags)
853{
[f0fb38f]854        dag_record_t *erfptr;
[c66a465]855        libtrace_thread_t *t;
[d97778c]856
[43c00e5]857        /* If the packet previously owned a buffer that is not the buffer
[d97778c]858         * that contains the new packet data, we're going to need to free the
859         * old one to avoid memory leaks */
[f0b87a7]860        if (packet->buffer != buffer &&
[f0fb38f]861                        packet->buf_control == TRACE_CTRL_PACKET) {
862                free(packet->buffer);
863        }
[f0b87a7]864
[43c00e5]865        /* Set the buffer owner appropriately */
[f0fb38f]866        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
867                packet->buf_control = TRACE_CTRL_PACKET;
868        } else
869                packet->buf_control = TRACE_CTRL_EXTERNAL;
[f0b87a7]870
[43c00e5]871        /* Update the packet pointers and type appropriately */
[f52bcdd]872        erfptr = (dag_record_t *)buffer;
[f0fb38f]873        packet->buffer = erfptr;
[d97778c]874        packet->header = erfptr;
875        packet->type = rt_type;
[f0b87a7]876
[f0fb38f]877        if (erfptr->flags.rxerror == 1) {
[d97778c]878                /* rxerror means the payload is corrupt - drop the payload
879                 * by tweaking rlen */
880                packet->payload = NULL;
881                erfptr->rlen = htons(erf_get_framing_length(packet));
882        } else {
883                packet->payload = (char*)packet->buffer
884                        + erf_get_framing_length(packet);
885        }
[f0b87a7]886
[f0fb38f]887        if (libtrace->format_data == NULL) {
[f0b87a7]888                dag_init_format_data(libtrace);
[f0fb38f]889        }
[f0b87a7]890
[43c00e5]891        /* Update the dropped packets counter */
[d97778c]892        /* No loss counter for DSM coloured records - have to use some
893         * other API */
[f0fb38f]894        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
[43c00e5]895                /* TODO */
[f0fb38f]896        } else {
[43c00e5]897                /* Use the ERF loss counter */
[c66a465]898                if (DATA(libtrace)->per_thread) {
899                        t = get_thread_table(libtrace);
900                        PERPKT_DATA(t)->drops += ntohs(erfptr->lctr);
901                } else {
[d97778c]902                        if (FORMAT_DATA->seeninterface[erfptr->flags.iface]
903                            == 0) {
904                                FORMAT_DATA->seeninterface[erfptr->flags.iface]
905                                        = 1;
[97d170d]906                        } else {
907                                FORMAT_DATA->drops += ntohs(erfptr->lctr);
908                        }
[c66a465]909                }
[f0fb38f]910        }
[c66a465]911
[f0fb38f]912        return 0;
[5e85c23]913}
914
[5865c72]915/*
916 * dag_write_packet() at this stage attempts to improve tx performance
917 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
918 * has been met. I observed approximately 270% performance increase
919 * through this relatively naive tweak. No optimisation of buffer sizes
920 * was attempted.
921 */
922
[43c00e5]923/* Pushes an ERF record onto the transmit stream */
[5865c72]924static int dag_dump_packet(libtrace_out_t *libtrace,
[d97778c]925                           dag_record_t *erfptr, unsigned int pad,
926                           void *buffer)
927{
[5865c72]928        int size;
929
930        /*
[d97778c]931         * If we've got 0 bytes waiting in the txqueue, assume that we
932         * haven't requested any space yet, and request some, storing
933         * the pointer at FORMAT_DATA_OUT->txbuffer.
[5865c72]934         *
935         * The amount to request is slightly magical at the moment - it's
936         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
937         * the buffer and handle overruns.
938         */
939        if (FORMAT_DATA_OUT->waiting == 0) {
[d97778c]940                FORMAT_DATA_OUT->txbuffer =
941                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
942                                                FORMAT_DATA_OUT->dagstream,
943                                                16908288);
[5865c72]944        }
945
946        /*
[43c00e5]947         * Copy the header separately to the body, as we can't guarantee they
948         * are in contiguous memory
[5865c72]949         */
[d97778c]950        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
951               (dag_record_size + pad));
[5865c72]952        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
953
954        /*
[43c00e5]955         * Copy our incoming packet into the outgoing buffer, and increment
956         * our waiting count
[5865c72]957         */
958        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
[d97778c]959        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
960               size);
[5865c72]961        FORMAT_DATA_OUT->waiting += size;
962
963        /*
[43c00e5]964         * If our output buffer has more than 16 Mebibytes in it, commit those
965         * bytes and reset the waiting count to 0.
966         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
967         * case there is still data in the buffer at program exit.
[5865c72]968         */
969        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
[d97778c]970                FORMAT_DATA_OUT->txbuffer =
971                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
972                                                   FORMAT_DATA_OUT->dagstream,
973                                                   FORMAT_DATA_OUT->waiting);
[5865c72]974                FORMAT_DATA_OUT->waiting = 0;
975        }
976
977        return size + pad + dag_record_size;
978}
979
[43c00e5]980/* Attempts to determine a suitable ERF type for a given packet. Returns true
981 * if one is found, false otherwise */
[5865c72]982static bool find_compatible_linktype(libtrace_out_t *libtrace,
[d97778c]983                                     libtrace_packet_t *packet, char *type)
[5865c72]984{
[d97778c]985        /* Keep trying to simplify the packet until we can find
986         * something we can do with it */
[5865c72]987
988        do {
[d97778c]989                *type = libtrace_to_erf_type(trace_get_link_type(packet));
[5865c72]990
[d97778c]991                /* Success */
[43c00e5]992                if (*type != (char)-1)
[5865c72]993                        return true;
994
995                if (!demote_packet(packet)) {
996                        trace_set_err_out(libtrace,
[d97778c]997                                          TRACE_ERR_NO_CONVERSION,
998                                          "No erf type for packet (%i)",
999                                          trace_get_link_type(packet));
[5865c72]1000                        return false;
1001                }
1002
1003        } while(1);
1004
1005        return true;
1006}
[5e85c23]1007
[43c00e5]1008/* Writes a packet to the provided DAG output trace */
[d97778c]1009static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1010{
1011        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1012         * coding sucks, sorry about that.
[5865c72]1013         */
1014        unsigned int pad = 0;
1015        int numbytes;
1016        void *payload = packet->payload;
1017        dag_record_t *header = (dag_record_t *)packet->header;
[43c00e5]1018        char erf_type = 0;
[5865c72]1019
1020        if(!packet->header) {
[d97778c]1021                /* No header, probably an RT packet. Lifted from
[43c00e5]1022                 * erf_write_packet(). */
[5865c72]1023                return -1;
1024        }
[f0b87a7]1025
[5f329ab]1026        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1027                return 0;
1028
[5865c72]1029        pad = dag_get_padding(packet);
[f0b87a7]1030
[5865c72]1031        /*
[43c00e5]1032         * If the payload is null, adjust the rlen. Discussion of this is
[5865c72]1033         * attached to erf_write_packet()
1034         */
1035        if (payload == NULL) {
1036                header->rlen = htons(dag_record_size + pad);
1037        }
1038
1039        if (packet->type == TRACE_RT_DATA_ERF) {
[d97778c]1040                numbytes = dag_dump_packet(libtrace, header, pad, payload);
[5865c72]1041        } else {
1042                /* Build up a new packet header from the existing header */
1043
[d97778c]1044                /* Simplify the packet first - if we can't do this, break
[43c00e5]1045                 * early */
1046                if (!find_compatible_linktype(libtrace,packet,&erf_type))
[5865c72]1047                        return -1;
1048
1049                dag_record_t erfhdr;
1050
1051                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1052                payload=packet->payload;
1053                pad = dag_get_padding(packet);
1054
1055                /* Flags. Can't do this */
1056                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
[51d1f64]1057                if (trace_get_direction(packet)!=(int)~0U)
[5865c72]1058                        erfhdr.flags.iface = trace_get_direction(packet);
[35c5a72]1059
[43c00e5]1060                erfhdr.type = erf_type;
[35c5a72]1061
[5865c72]1062                /* Packet length (rlen includes format overhead) */
[d97778c]1063                assert(trace_get_capture_length(packet) > 0
1064                       && trace_get_capture_length(packet) <= 65536);
1065                assert(erf_get_framing_length(packet) > 0
1066                       && trace_get_framing_length(packet) <= 65536);
1067                assert(trace_get_capture_length(packet) +
1068                       erf_get_framing_length(packet) > 0
1069                       && trace_get_capture_length(packet) +
1070                       erf_get_framing_length(packet) <= 65536);
[35c5a72]1071
[5865c72]1072                erfhdr.rlen = htons(trace_get_capture_length(packet)
[d97778c]1073                                    + erf_get_framing_length(packet));
[35c5a72]1074
1075
[43c00e5]1076                /* Loss counter. Can't do this */
[5865c72]1077                erfhdr.lctr = 0;
1078                /* Wire length, does not include padding! */
1079                erfhdr.wlen = htons(trace_get_wire_length(packet));
1080
1081                /* Write it out */
[d97778c]1082                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
[35c5a72]1083        }
[f0b87a7]1084
[5865c72]1085        return numbytes;
[f0b87a7]1086}
1087
[43c00e5]1088/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1089 *
1090 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1091 */
[d97778c]1092static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1093{
1094        int size = 0;
1095        struct timeval tv;
1096        dag_record_t *erfptr = NULL;
[52656ed]1097        int numbytes = 0;
[f0fb38f]1098        uint32_t flags = 0;
[51d1f64]1099        struct timeval maxwait;
1100        struct timeval pollwait;
1101
1102        pollwait.tv_sec = 0;
1103        pollwait.tv_usec = 10000;
1104        maxwait.tv_sec = 0;
1105        maxwait.tv_usec = 250000;
[f0b87a7]1106
[d97778c]1107        /* Check if we're due for a DUCK report */
[43c00e5]1108        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
[d97778c]1109            DUCK.duck_freq != 0) {
1110                size = dag_get_duckinfo(libtrace, packet);
1111                DUCK.last_duck = DUCK.last_pkt;
1112                if (size != 0) {
1113                        return size;
1114                }
1115                /* No DUCK support, so don't waste our time anymore */
1116                DUCK.duck_freq = 0;
1117        }
[5e85c23]1118
[43c00e5]1119        /* Don't let anyone try to free our DAG memory hole! */
[c0dba7a]1120        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
[f0b87a7]1121
[43c00e5]1122        /* If the packet buffer is currently owned by libtrace, free it so
1123         * that we can set the packet to point into the DAG memory hole */
[d97778c]1124        if (packet->buf_control == TRACE_CTRL_PACKET) {
1125                free(packet->buffer);
1126                packet->buffer = 0;
1127        }
1128
1129        if (dag_set_stream_poll(FORMAT_DATA->device->fd, FORMAT_DATA->dagstream,
1130                                sizeof(dag_record_t), &maxwait,
1131                                &pollwait) == -1) {
[51d1f64]1132                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1133                return -1;
1134        }
1135
[5e85c23]1136
[43c00e5]1137        /* Grab a full ERF record */
[5e85c23]1138        do {
[52656ed]1139                numbytes = dag_available(libtrace);
1140                if (numbytes < 0)
1141                        return numbytes;
[51d1f64]1142                if (numbytes < dag_record_size) {
1143                        if (libtrace_halt)
1144                                return 0;
[52656ed]1145                        /* Block until we see a packet */
1146                        continue;
[51d1f64]1147                }
[5e85c23]1148                erfptr = dag_get_record(libtrace);
1149        } while (erfptr == NULL);
1150
[43c00e5]1151        /* Prepare the libtrace packet */
[f0fb38f]1152        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
[d97778c]1153                               flags))
[f0fb38f]1154                return -1;
[43c00e5]1155
1156        /* Update the DUCK timer */
[5e85c23]1157        tv = trace_get_timeval(packet);
[d97778c]1158        DUCK.last_pkt = tv.tv_sec;
[f0b87a7]1159
1160        return packet->payload ? htons(erfptr->rlen) :
[d97778c]1161                erf_get_framing_length(packet);
[5e85c23]1162}
1163
[43c00e5]1164/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1165 * packet is available, we will return a packet event. Otherwise we will
1166 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1167 */
[51d1f64]1168static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
[d97778c]1169                                           libtrace_packet_t *packet)
1170{
1171        libtrace_eventobj_t event = {0,0,0.0,0};
[5e85c23]1172        dag_record_t *erfptr = NULL;
[6033b99]1173        int numbytes;
[f0fb38f]1174        uint32_t flags = 0;
[51d1f64]1175        struct timeval minwait;
[d97778c]1176
[51d1f64]1177        minwait.tv_sec = 0;
1178        minwait.tv_usec = 10000;
[d97778c]1179
1180        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
1181                                FORMAT_DATA->dagstream, 0, &minwait,
1182                                &minwait) == -1) {
[51d1f64]1183                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1184                event.type = TRACE_EVENT_TERMINATE;
1185                return event;
1186        }
[f0b87a7]1187
[5778738]1188        do {
1189                erfptr = NULL;
1190                numbytes = 0;
[d97778c]1191
[5778738]1192                /* Need to call dag_available so that the top pointer will get
1193                 * updated, otherwise we'll never see any data! */
[51d1f64]1194                numbytes = dag_available(libtrace);
[5778738]1195
[d97778c]1196                /* May as well not bother calling dag_get_record if
[5778738]1197                 * dag_available suggests that there's no data */
1198                if (numbytes != 0)
[51d1f64]1199                        erfptr = dag_get_record(libtrace);
[5778738]1200                if (erfptr == NULL) {
[43c00e5]1201                        /* No packet available - sleep for a very short time */
[51d1f64]1202                        if (libtrace_halt) {
1203                                event.type = TRACE_EVENT_TERMINATE;
[d97778c]1204                        } else {
[51d1f64]1205                                event.type = TRACE_EVENT_SLEEP;
1206                                event.seconds = 0.0001;
1207                        }
[5778738]1208                        break;
1209                }
[d97778c]1210                if (dag_prepare_packet(libtrace, packet, erfptr,
1211                                       TRACE_RT_DATA_ERF, flags)) {
[5778738]1212                        event.type = TRACE_EVENT_TERMINATE;
1213                        break;
1214                }
[f0b87a7]1215
1216
[d97778c]1217                event.size = trace_get_capture_length(packet) +
1218                        trace_get_framing_length(packet);
1219
[43c00e5]1220                /* XXX trace_read_packet() normally applies the following
1221                 * config options for us, but this function is called via
1222                 * trace_event() so we have to do it ourselves */
1223
[51d1f64]1224                if (libtrace->filter) {
[d97778c]1225                        int filtret = trace_apply_filter(libtrace->filter,
1226                                                         packet);
[51d1f64]1227                        if (filtret == -1) {
1228                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
[d97778c]1229                                              "Bad BPF Filter");
[51d1f64]1230                                event.type = TRACE_EVENT_TERMINATE;
1231                                break;
1232                        }
1233
1234                        if (filtret == 0) {
[5778738]1235                                /* This packet isn't useful so we want to
1236                                 * immediately see if there is another suitable
1237                                 * one - we definitely DO NOT want to return
1238                                 * a sleep event in this case, like we used to
1239                                 * do! */
[d97778c]1240                                libtrace->filtered_packets ++;
[d8e4595]1241                                trace_clear_cache(packet);
[5778738]1242                                continue;
1243                        }
[d97778c]1244
[51d1f64]1245                        event.type = TRACE_EVENT_PACKET;
[5e85c23]1246                } else {
[5778738]1247                        event.type = TRACE_EVENT_PACKET;
[5e85c23]1248                }
1249
[51d1f64]1250                if (libtrace->snaplen > 0) {
1251                        trace_set_capture_length(packet, libtrace->snaplen);
[5778738]1252                }
[d97778c]1253                libtrace->accepted_packets ++;
[5778738]1254                break;
[d97778c]1255        } while(1);
[5e85c23]1256
1257        return event;
1258}
1259
[43c00e5]1260/* Gets the number of dropped packets */
[d97778c]1261static uint64_t dag_get_dropped_packets(libtrace_t *trace)
1262{
[c66a465]1263        uint64_t sum = 0;
1264        int i, tot;
1265
[f0fb38f]1266        if (trace->format_data == NULL)
[d97778c]1267                return (uint64_t) - 1;
[c66a465]1268
1269        if (DATA(trace)->per_thread) {
1270                tot = trace->perpkt_thread_count;
1271
1272                for (i = 0; i < tot; i++) {
1273                        printf("t%d: drops %" PRIu64 "\n",
[d97778c]1274                               DATA(trace)->per_thread[i].drops);
[c66a465]1275                        sum += DATA(trace)->per_thread[i].drops;
1276                }
1277        }
1278
1279        sum += DATA(trace)->drops;
1280
1281        return sum;
[2b7750a]1282}
[5e85c23]1283
[43c00e5]1284/* Prints some semi-useful help text about the DAG format module */
[5e85c23]1285static void dag_help(void) {
[d97778c]1286        printf("dag format module: $Revision: 1755 $\n");
1287        printf("Supported input URIs:\n");
1288        printf("\tdag:/dev/dagn\n");
1289        printf("\n");
1290        printf("\te.g.: dag:/dev/dag0\n");
1291        printf("\n");
1292        printf("Supported output URIs:\n");
1293        printf("\tnone\n");
1294        printf("\n");
[5e85c23]1295}
1296
[d97778c]1297static int dag_pstart_input(libtrace_t *libtrace)
1298{
[fe11d12]1299        char *scan, *tok;
1300        uint16_t stream_count = 0, max_streams;
1301        /* We keep our own pointer to per_thread as the system will free
1302         * up FORMAT_DATA without freeing this if something goes wrong */
1303        struct dag_per_thread_t *per_thread = NULL;
1304        int iserror = 0;
1305
[d97778c]1306        /* Check we aren't trying to create more threads than the DAG card can
[fe11d12]1307         * handle */
1308        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
1309        if (libtrace->perpkt_thread_count > max_streams) {
[d97778c]1310                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
1311                              "trying to create too many threads (max is %u)",
1312                              max_streams);
[fe11d12]1313                iserror = 1;
1314                goto cleanup;
1315        }
1316
1317        /* Create the thread structures */
1318        per_thread = calloc(libtrace->perpkt_thread_count,
[d97778c]1319                            sizeof(struct dag_per_thread_t));
[fe11d12]1320        FORMAT_DATA->per_thread = per_thread;
1321
1322        /* Get the stream names from the uri */
1323        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
[d97778c]1324                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
1325                              "format uri doesn't specify the DAG streams");
[fe11d12]1326                iserror = 1;
1327                goto cleanup;
1328        }
1329
1330        scan++;
[d97778c]1331
[fe11d12]1332        tok = strtok(scan, ",");
1333        while (tok != NULL) {
1334                /* Ensure we haven't specified too many streams */
1335                if (stream_count >= libtrace->perpkt_thread_count) {
[d97778c]1336                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
1337                                      "format uri specifies too many streams. "
1338                                      "Max is %u", max_streams);
[fe11d12]1339                        iserror = 1;
1340                        goto cleanup;
1341                }
1342
1343                /* Save the stream details */
1344                per_thread[stream_count].device = FORMAT_DATA->device;
1345                per_thread[stream_count++].stream = (uint16_t)atoi(tok);
1346
1347                tok = strtok(NULL, ",");
1348        }
1349
1350 cleanup:
1351        if (iserror) {
1352                /* Free the per_thread memory */
1353                free(per_thread);
1354                return -1;
1355        } else {
1356                return 0;
1357        }
1358}
1359
1360
1361
1362/* TODO: Fold this into dag_available */
[d97778c]1363static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t)
1364{
[fe11d12]1365        uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1366
1367        /* If we've processed more than 4MB of data since we last called
1368         * dag_advance_stream, then we should call it again to allow the
1369         * space occupied by that 4MB to be released */
[d97778c]1370        if (diff >= dag_record_size && PERPKT_DATA(t)->processed <
1371            4*1024*1024)
[fe11d12]1372                return diff;
1373
1374        /* Update top and bottom pointers */
1375        PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
[d97778c]1376                                                 PERPKT_DATA(t)->stream,
1377                                                 &(PERPKT_DATA(t)->bottom));
[fe11d12]1378
1379        if (PERPKT_DATA(t)->top == NULL) {
1380                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
1381                return -1;
1382        }
1383
1384        PERPKT_DATA(t)->processed = 0;
1385        diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
1386        return diff;
1387}
1388
1389/* TODO: Fold this into dag_get_record */
1390static dag_record_t *dag_pget_record(libtrace_t *libtrace,
[d97778c]1391                                     libtrace_thread_t *t)
1392{
[fe11d12]1393        dag_record_t *erfptr = NULL;
1394        uint16_t size;
1395
1396        erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
1397        if (!erfptr)
1398                return NULL;
1399
1400        /* Ensure we have a whole record */
1401        size = ntohs(erfptr->rlen);
1402        assert(size >= dag_record_size);
1403        if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
1404                return NULL;
1405
1406        /* Advance the buffer pointers */
1407        PERPKT_DATA(t)->bottom += size;
1408        PERPKT_DATA(t)->processed += size;
[d97778c]1409
[fe11d12]1410        return erfptr;
1411}
1412
1413static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
[d97778c]1414                            libtrace_packet_t *packet)
1415{
[fe11d12]1416        dag_record_t *erfptr = NULL;
1417        int numbytes = 0;
1418        uint32_t flags = 0;
1419        struct timeval maxwait, pollwait;
1420
1421        pollwait.tv_sec = 0;
1422        pollwait.tv_usec = 10000;
1423        maxwait.tv_sec = 0;
1424        maxwait.tv_usec = 250000;
1425
1426        /* TODO: Support DUCK reporting */
1427
1428        /* Don't let anyone try to free our DAG memory hole! */
1429        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1430
1431        /* If the packet buffer is currently owned by libtrace, free it so
1432         * that we can set the packet to point into the DAG memory hole */
1433        if (packet->buf_control == TRACE_CTRL_PACKET) {
1434                free(packet->buffer);
1435                packet->buffer = 0;
1436        }
1437
1438        /* Configure DAG card stream polling */
[d97778c]1439        if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
1440                                PERPKT_DATA(t)->stream, sizeof(dag_record_t),
1441                                &maxwait, &pollwait) < 0) {
[fe11d12]1442                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1443                return -1;
1444        }
1445
1446        /* Grab an ERF record */
1447        do {
1448                numbytes = dag_pavailable(libtrace, t);
1449                if (numbytes < 0)
1450                        return numbytes;
1451                if (numbytes < dag_record_size) {
1452                        if (libtrace_halt)
1453                                return 0;
[9149564]1454
[d97778c]1455                        /* Check message queue to see if we should
1456                         * abort early */
[9149564]1457                        if (libtrace_message_queue_count(&t->messages) > 0)
1458                                return -2;
1459
[fe11d12]1460                        /* Keep trying until we see a packet */
1461                        continue;
1462                }
1463
1464                erfptr = dag_pget_record(libtrace, t);
1465        } while (erfptr == NULL);
1466
1467        /* Prepare the libtrace packet */
1468        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
[d97778c]1469                               flags))
[fe11d12]1470                return -1;
1471
[9149564]1472        PERPKT_DATA(t)->pkt_count++;
1473
[fe11d12]1474        return packet->payload ? htons(erfptr->rlen) :
1475                erf_get_framing_length(packet);
1476}
1477
[d97778c]1478static int dag_ppause_input(libtrace_t *libtrace)
1479{
[9149564]1480        int i, tot = libtrace->perpkt_thread_count;
1481        struct dag_per_thread_t *t_data;
[fe11d12]1482
[9149564]1483        /* Stop and detach all the streams */
1484        printf("Stopping and detaching all streams\n");
1485        for (i = 0; i < tot; i++) {
1486                t_data = &FORMAT_DATA->per_thread[i];
[fe11d12]1487
[9149564]1488                if (dag_stop_stream(t_data->device->fd,
[d97778c]1489                                    t_data->stream) < 0) {
1490                        trace_set_err(libtrace, errno,
1491                                      "can't stop DAG stream #%u",
1492                                      t_data->stream);
[9149564]1493                        return -1;
1494                }
1495
1496                if (dag_detach_stream(t_data->device->fd,
[d97778c]1497                                      t_data->stream) < 0) {
1498                        trace_set_err(libtrace, errno,
1499                                      "can't detach DAG stream #%u",
1500                                      t_data->stream);
[9149564]1501                        return -1;
1502                }
1503        }
1504
1505        /* Free up the per_thread array */
1506        free(FORMAT_DATA->per_thread);
1507        FORMAT_DATA->per_thread = NULL;
1508
1509        return 0;
[fe11d12]1510}
1511
1512static int dag_pconfig_input(libtrace_t *libtrace,
[d97778c]1513                             trace_parallel_option_t option, void *value)
1514{
[9149564]1515        /* We don't support any of these! Normally you configure the DAG card
1516         * externally. */
1517        switch(option) {
1518        case TRACE_OPTION_SET_HASHER:
1519        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1520        case TRACE_OPTION_TRACETIME:
1521        case TRACE_OPTION_TICK_INTERVAL:
1522        case TRACE_OPTION_GET_CONFIG:
1523        case TRACE_OPTION_SET_CONFIG:
1524                return -1;
1525        }
1526        /* We don't provide a default option to ensure that future options will
1527         * generate a compiler warning. */
[fe11d12]1528
1529        return -1;
1530}
1531
1532static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
[d97778c]1533                                bool reader)
1534{
[fe11d12]1535        /* XXX: This function gets run sequentially for all
1536         * threads. Should investigate making it parallel as draining the
[9149564]1537         * memory could be needlessly time consuming.
[fe11d12]1538         */
1539        uint8_t *top, *bottom;
[d97778c]1540        /* XXX: Investigate this type, as I would assume the value
1541         * could be larger than 255 */
1542        uint8_t diff = 0;
[9149564]1543        struct timeval zero, nopoll;
[fe11d12]1544
1545        top = bottom = NULL;
1546
[9149564]1547        /* Minimum delay is 10mS */
1548        zero.tv_sec = 0;
1549        zero.tv_usec = 10000;
1550        nopoll = zero;
1551
[fe11d12]1552        if (reader) {
1553                if (t->type == THREAD_PERPKT) {
1554                        /* Pass the per thread data to the thread */
[d97778c]1555                        t->format_data =
1556                                &FORMAT_DATA->per_thread[t->perpkt_num];
[fe11d12]1557
1558                        /* Attach and start the DAG stream */
[d97778c]1559                        printf("t%u: starting and attaching stream #%u\n",
1560                               t->perpkt_num, PERPKT_DATA(t)->stream);
[fe11d12]1561                        if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
[d97778c]1562                                              PERPKT_DATA(t)->stream, 0,
1563                                              0) < 0) {
1564                                trace_set_err(libtrace, errno,
1565                                              "can't attach DAG stream #%u",
1566                                              PERPKT_DATA(t)->stream);
[fe11d12]1567                                return -1;
1568                        }
1569                        if (dag_start_stream(PERPKT_DATA(t)->device->fd,
[d97778c]1570                                             PERPKT_DATA(t)->stream) < 0) {
1571                                trace_set_err(libtrace, errno,
1572                                              "can't start DAG stream #%u",
1573                                              PERPKT_DATA(t)->stream);
[fe11d12]1574                                return -1;
1575                        }
1576
[9149564]1577                        /* Ensure that dag_advance_stream will return without blocking */
1578                        if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
[d97778c]1579                                               PERPKT_DATA(t)->stream, 0, &zero,
1580                                               &nopoll) < 0) {
1581                                trace_set_err(libtrace, errno,
1582                                              "dag_set_stream_poll failed!");
[9149564]1583                                return -1;
1584                        }
1585
[fe11d12]1586                        /* Clear all the data from the memory hole */
1587                        do {
[d97778c]1588                                top = dag_advance_stream(PERPKT_DATA(t)->
1589                                                         device->fd,
1590                                                         PERPKT_DATA(t)->stream,
1591                                                         &bottom);
[9149564]1592
[fe11d12]1593                                assert(top && bottom);
1594                                diff = top - bottom;
1595                                bottom -= diff;
1596                        } while (diff != 0);
1597
1598                        PERPKT_DATA(t)->top = NULL;
1599                        PERPKT_DATA(t)->bottom = NULL;
[9149564]1600                        PERPKT_DATA(t)->pkt_count = 0;
[c66a465]1601                        PERPKT_DATA(t)->drops = 0;
[fe11d12]1602                } else {
1603                        /* TODO: Figure out why we need this */
1604                        t->format_data = &FORMAT_DATA->per_thread[0];
1605                }
1606        }
1607
[9149564]1608        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
[fe11d12]1609
1610        return 0;
1611}
1612
[5e85c23]1613static struct libtrace_format_t dag = {
[d97778c]1614        "dag",
1615        "$Id$",
1616        TRACE_FORMAT_ERF,
[91b72d3]1617        dag_probe_filename,             /* probe filename */
1618        NULL,                           /* probe magic */
[d97778c]1619        dag_init_input,                 /* init_input */
1620        dag_config_input,               /* config_input */
1621        dag_start_input,                /* start_input */
1622        dag_pause_input,                /* pause_input */
[35c5a72]1623        dag_init_output,                /* init_output */
[d97778c]1624        NULL,                           /* config_output */
[35c5a72]1625        dag_start_output,               /* start_output */
[d97778c]1626        dag_fin_input,                  /* fin_input */
[35c5a72]1627        dag_fin_output,                 /* fin_output */
[d97778c]1628        dag_read_packet,                /* read_packet */
1629        dag_prepare_packet,             /* prepare_packet */
[f0fb38f]1630        NULL,                           /* fin_packet */
[35c5a72]1631        dag_write_packet,               /* write_packet */
[d97778c]1632        erf_get_link_type,              /* get_link_type */
1633        erf_get_direction,              /* get_direction */
1634        erf_set_direction,              /* set_direction */
1635        erf_get_erf_timestamp,          /* get_erf_timestamp */
1636        NULL,                           /* get_timeval */
1637        NULL,                           /* get_seconds */
[1aa4bf7]1638        NULL,                           /* get_timespec */
[d97778c]1639        NULL,                           /* seek_erf */
1640        NULL,                           /* seek_timeval */
1641        NULL,                           /* seek_seconds */
1642        erf_get_capture_length,         /* get_capture_length */
1643        erf_get_wire_length,            /* get_wire_length */
1644        erf_get_framing_length,         /* get_framing_length */
1645        erf_set_capture_length,         /* set_capture_length */
[f2fae49]1646        NULL,                           /* get_received_packets */
1647        NULL,                           /* get_filtered_packets */
[2b7750a]1648        dag_get_dropped_packets,        /* get_dropped_packets */
[f2fae49]1649        NULL,                           /* get_captured_packets */
[d97778c]1650        NULL,                           /* get_fd */
1651        trace_event_dag,                /* trace_event */
1652        dag_help,                       /* help */
1653        NULL,                            /* next pointer */
[fe11d12]1654                {true, 0}, /* live packet capture, thread limit TBD */
1655                dag_pstart_input,
1656                dag_pread_packet,
1657                dag_ppause_input,
[9149564]1658                NULL,
[fe11d12]1659                dag_pconfig_input,
1660                dag_pregister_thread,
[9149564]1661                NULL
[5e85c23]1662};
1663
[d97778c]1664void dag_constructor(void)
1665{
[5e85c23]1666        register_format(&dag);
1667}
Note: See TracBrowser for help on using the repository browser.