source: lib/format_dag25.c @ cb39d35

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since cb39d35 was cb39d35, checked in by Dan Collins <dan@…>, 6 years ago

Moved DAG format to better support parallel and non-parallel API

Format data per stream is now stored in a linked list. This allows us to
add multiple per stream blocks for each additional stream, while still supporting
the original API. This greatly reduces code duplication and, to a minor extent,
RAM usage.

  • Property mode set to 100644
File size: 44.2 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81#define DATA(x) ((struct dag_format_data_t *)x->format_data)
82#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89
90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
92
93static struct libtrace_format_t dag;
94
95/* A DAG device - a DAG device can support multiple streams (and therefore
96 * multiple input traces) so each trace needs to refer to a device */
97struct dag_dev_t {
98        char * dev_name;                /* Device name */
99        int fd;                         /* File descriptor */
100        uint16_t ref_count;             /* Number of input / output traces
101                                           that are using this device */
102        struct dag_dev_t *prev;         /* Pointer to the previous device in
103                                           the device list */
104        struct dag_dev_t *next;         /* Pointer to the next device in the
105                                           device list */
106};
107
108/* "Global" data that is stored for each DAG output trace */
109struct dag_format_data_out_t {
110        /* The DAG device being used for writing */
111        struct dag_dev_t *device;
112        /* The DAG stream that is being written on */
113        unsigned int dagstream;
114        /* Boolean flag indicating whether the stream is currently attached */
115        int stream_attached;
116        /* The amount of data waiting to be transmitted, in bytes */
117        uint64_t waiting;
118        /* A buffer to hold the data to be transmittted */
119        uint8_t *txbuffer;
120};
121
122/* Data that is stored against each input stream */
123struct dag_per_stream_t {
124        /* DAG device */
125        struct dag_dev_t *device;
126        /* DAG stream number */
127        uint16_t dagstream;
128        /* Pointer to the last unread byte in the DAG memory */
129        uint8_t *top;
130        /* Pointer to the first unread byte in the DAG memory */
131        uint8_t *bottom;
132        /* Amount of data processed from the bottom pointer */
133        uint32_t processed;
134        /* Number of packets seen by the stream */
135        uint64_t pkt_count;
136        /* Drop count for this particular stream */
137        uint64_t drops;
138        /* Boolean values to indicate if a particular interface has been seen
139         * or not. This is limited to four interfaces, which is enough to
140         * support all current DAG cards */
141        uint8_t seeninterface[4];
142};
143
144/* "Global" data that is stored for each DAG input trace */
145struct dag_format_data_t {
146        /* Data required for regular DUCK reporting */
147        /* TODO: This doesn't work with the 10X2S card! I don't know how
148         * DUCK stuff works and don't know how to fix it */
149        struct {
150                /* Timestamp of the last DUCK report */
151                uint32_t last_duck;
152                /* The number of seconds between each DUCK report */
153                uint32_t duck_freq;
154                /* Timestamp of the last packet read from the DAG card */
155                uint32_t last_pkt;
156                /* Dummy trace to ensure DUCK packets are dealt with using the
157                 * DUCK format functions */
158                libtrace_t *dummy_duck;
159        } duck;
160
161        /* Boolean flag indicating whether the trace is currently attached */
162        int stream_attached;
163
164        /* Data stored against each DAG input stream */
165        libtrace_list_t *per_stream;
166};
167
168/* To be thread-safe, we're going to need a mutex for operating on the list
169 * of DAG devices */
170pthread_mutex_t open_dag_mutex;
171
172/* The list of DAG devices that have been opened by libtrace.
173 *
174 * We can only open each DAG device once, but we might want to read from
175 * multiple streams. Therefore, we need to maintain a list of devices that we
176 * have opened (with ref counts!) so that we don't try to open a device too
177 * many times or close a device that we're still using */
178struct dag_dev_t *open_dags = NULL;
179
180/* Returns the amount of padding between the ERF header and the start of the
181 * captured packet data */
182static int dag_get_padding(const libtrace_packet_t *packet)
183{
184        /* ERF Ethernet records have a 2 byte padding before the packet itself
185         * so that the IP header is aligned on a 32 bit boundary.
186         */
187        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
188                dag_record_t *erfptr = (dag_record_t *)packet->header;
189                switch(erfptr->type) {
190                        case TYPE_ETH:
191                        case TYPE_DSM_COLOR_ETH:
192                                return 2;
193                        default:                return 0;
194                }
195        }
196        else {
197                switch(trace_get_link_type(packet)) {
198                        case TRACE_TYPE_ETH:    return 2;
199                        default:                return 0;
200                }
201        }
202}
203
204/* Attempts to determine if the given filename refers to a DAG device */
205static int dag_probe_filename(const char *filename)
206{
207        struct stat statbuf;
208        /* Can we stat the file? */
209        if (stat(filename, &statbuf) != 0) {
210                return 0;
211        }
212        /* Is it a character device? */
213        if (!S_ISCHR(statbuf.st_mode)) {
214                return 0;
215        }
216        /* Yeah, it's probably us. */
217        return 1;
218}
219
220/* Initialises the DAG output data structure */
221static void dag_init_format_out_data(libtrace_out_t *libtrace)
222{
223        libtrace->format_data = (struct dag_format_data_out_t *)
224                malloc(sizeof(struct dag_format_data_out_t));
225        // no DUCK on output
226        FORMAT_DATA_OUT->stream_attached = 0;
227        FORMAT_DATA_OUT->device = NULL;
228        FORMAT_DATA_OUT->dagstream = 0;
229        FORMAT_DATA_OUT->waiting = 0;
230
231}
232
233/* Initialises the DAG input data structure */
234static void dag_init_format_data(libtrace_t *libtrace)
235{
236        struct dag_per_stream_t stream_data;
237
238        libtrace->format_data = (struct dag_format_data_t *)
239                malloc(sizeof(struct dag_format_data_t));
240        DUCK.last_duck = 0;
241        DUCK.duck_freq = 0;
242        DUCK.last_pkt = 0;
243        DUCK.dummy_duck = NULL;
244
245        FORMAT_DATA->per_stream =
246                libtrace_list_init(sizeof(stream_data));
247        assert(FORMAT_DATA->per_stream != NULL);
248
249        /* We'll start with just one instance of stream_data, and we'll
250         * add more later if we need them */
251        memset(&stream_data, 0, sizeof(stream_data));
252        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
253}
254
255/* Determines if there is already an entry for the given DAG device in the
256 * device list and increments the reference count for that device, if found.
257 *
258 * NOTE: This function assumes the open_dag_mutex is held by the caller */
259static struct dag_dev_t *dag_find_open_device(char *dev_name)
260{
261        struct dag_dev_t *dag_dev;
262
263        dag_dev = open_dags;
264
265        /* XXX: Not exactly zippy, but how often are we going to be dealing
266         * with multiple dag cards? */
267        while (dag_dev != NULL) {
268                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
269                        dag_dev->ref_count ++;
270                        return dag_dev;
271                }
272                dag_dev = dag_dev->next;
273        }
274        return NULL;
275}
276
277/* Closes a DAG device and removes it from the device list.
278 *
279 * Attempting to close a DAG device that has a non-zero reference count will
280 * cause an assertion failure!
281 *
282 * NOTE: This function assumes the open_dag_mutex is held by the caller */
283static void dag_close_device(struct dag_dev_t *dev)
284{
285        /* Need to remove from the device list */
286        assert(dev->ref_count == 0);
287
288        if (dev->prev == NULL) {
289                open_dags = dev->next;
290                if (dev->next)
291                        dev->next->prev = NULL;
292        } else {
293                dev->prev->next = dev->next;
294                if (dev->next)
295                        dev->next->prev = dev->prev;
296        }
297
298        dag_close(dev->fd);
299        if (dev->dev_name)
300                free(dev->dev_name);
301        free(dev);
302}
303
304
305/* Opens a new DAG device for writing and adds it to the DAG device list
306 *
307 * NOTE: this function should only be called when opening a DAG device for
308 * writing - there is little practical difference between this and the
309 * function below that covers the reading case, but we need the output trace
310 * object to report errors properly so the two functions take slightly
311 * different arguments. This is really lame and there should be a much better
312 * way of doing this.
313 *
314 * NOTE: This function assumes the open_dag_mutex is held by the caller
315 */
316static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
317                                                char *dev_name)
318{
319        struct stat buf;
320        int fd;
321        struct dag_dev_t *new_dev;
322
323        /* Make sure the device exists */
324        if (stat(dev_name, &buf) == -1) {
325                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
326                return NULL;
327        }
328
329        /* Make sure it is the appropriate type of device */
330        if (S_ISCHR(buf.st_mode)) {
331                /* Try opening the DAG device */
332                if((fd = dag_open(dev_name)) < 0) {
333                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
334                                        dev_name);
335                        return NULL;
336                }
337        } else {
338                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
339                                dev_name);
340                return NULL;
341        }
342
343        /* Add the device to our device list - it is just a doubly linked
344         * list with no inherent ordering; just tack the new one on the front
345         */
346        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
347        new_dev->fd = fd;
348        new_dev->dev_name = dev_name;
349        new_dev->ref_count = 1;
350
351        new_dev->prev = NULL;
352        new_dev->next = open_dags;
353        if (open_dags)
354                open_dags->prev = new_dev;
355
356        open_dags = new_dev;
357
358        return new_dev;
359}
360
361/* Opens a new DAG device for reading and adds it to the DAG device list
362 *
363 * NOTE: this function should only be called when opening a DAG device for
364 * reading - there is little practical difference between this and the
365 * function above that covers the writing case, but we need the input trace
366 * object to report errors properly so the two functions take slightly
367 * different arguments. This is really lame and there should be a much better
368 * way of doing this.
369 *
370 * NOTE: This function assumes the open_dag_mutex is held by the caller */
371static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
372        struct stat buf;
373        int fd;
374        struct dag_dev_t *new_dev;
375
376        /* Make sure the device exists */
377        if (stat(dev_name, &buf) == -1) {
378                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
379                return NULL;
380        }
381
382        /* Make sure it is the appropriate type of device */
383        if (S_ISCHR(buf.st_mode)) {
384                /* Try opening the DAG device */
385                if((fd = dag_open(dev_name)) < 0) {
386                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
387                                      dev_name);
388                        return NULL;
389                }
390        } else {
391                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
392                              dev_name);
393                return NULL;
394        }
395
396        /* Add the device to our device list - it is just a doubly linked
397         * list with no inherent ordering; just tack the new one on the front
398         */
399        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
400        new_dev->fd = fd;
401        new_dev->dev_name = dev_name;
402        new_dev->ref_count = 1;
403
404        new_dev->prev = NULL;
405        new_dev->next = open_dags;
406        if (open_dags)
407                open_dags->prev = new_dev;
408
409        open_dags = new_dev;
410
411        return new_dev;
412}
413
414/* Creates and initialises a DAG output trace */
415static int dag_init_output(libtrace_out_t *libtrace)
416{
417        char *dag_dev_name = NULL;
418        char *scan = NULL;
419        struct dag_dev_t *dag_device = NULL;
420        int stream = 1;
421
422        /* XXX I don't know if this is important or not, but this function
423         * isn't present in all of the driver releases that this code is
424         * supposed to support! */
425        /*
426        unsigned long wake_time;
427        dagutil_sleep_get_wake_time(&wake_time,0);
428        */
429
430        dag_init_format_out_data(libtrace);
431        /* Grab the mutex while we're likely to be messing with the device
432         * list */
433        pthread_mutex_lock(&open_dag_mutex);
434
435        /* Specific streams are signified using a comma in the libtrace URI,
436         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
437         *
438         * If no stream is specified, we will write using stream 1 */
439        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
440                dag_dev_name = strdup(libtrace->uridata);
441        } else {
442                dag_dev_name = (char *)strndup(libtrace->uridata,
443                                (size_t)(scan - libtrace->uridata));
444                stream = atoi(++scan);
445        }
446        FORMAT_DATA_OUT->dagstream = stream;
447
448        /* See if our DAG device is already open */
449        dag_device = dag_find_open_device(dag_dev_name);
450
451        if (dag_device == NULL) {
452                /* Device not yet opened - open it ourselves */
453                dag_device = dag_open_output_device(libtrace, dag_dev_name);
454        } else {
455                /* Otherwise, just use the existing one */
456                free(dag_dev_name);
457                dag_dev_name = NULL;
458        }
459
460        /* Make sure we have successfully opened a DAG device */
461        if (dag_device == NULL) {
462                if (dag_dev_name) {
463                        free(dag_dev_name);
464                }
465                pthread_mutex_unlock(&open_dag_mutex);
466                return -1;
467        }
468
469        FORMAT_DATA_OUT->device = dag_device;
470        pthread_mutex_unlock(&open_dag_mutex);
471        return 0;
472}
473
474/* Creates and initialises a DAG input trace */
475static int dag_init_input(libtrace_t *libtrace) {
476        char *dag_dev_name = NULL;
477        char *scan = NULL;
478        int stream = 0;
479        struct dag_dev_t *dag_device = NULL;
480
481        dag_init_format_data(libtrace);
482        /* Grab the mutex while we're likely to be messing with the device
483         * list */
484        pthread_mutex_lock(&open_dag_mutex);
485
486
487        /* DAG cards support multiple streams. In a single threaded capture,
488         * these are specified using a comma in the libtrace URI,
489         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
490         *
491         * If no stream is specified, we will read from stream 0 with
492         * one thread
493         */
494        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
495                dag_dev_name = strdup(libtrace->uridata);
496        } else {
497                dag_dev_name = (char *)strndup(libtrace->uridata,
498                                               (size_t)(scan -
499                                                        libtrace->uridata));
500                stream = atoi(++scan);
501        }
502
503        FORMAT_DATA_FIRST->dagstream = stream;
504
505        /* See if our DAG device is already open */
506        dag_device = dag_find_open_device(dag_dev_name);
507
508        if (dag_device == NULL) {
509                /* Device not yet opened - open it ourselves */
510                dag_device = dag_open_device(libtrace, dag_dev_name);
511        } else {
512                /* Otherwise, just use the existing one */
513                free(dag_dev_name);
514                dag_dev_name = NULL;
515        }
516
517        /* Make sure we have successfully opened a DAG device */
518        if (dag_device == NULL) {
519                if (dag_dev_name)
520                        free(dag_dev_name);
521                dag_dev_name = NULL;
522                pthread_mutex_unlock(&open_dag_mutex);
523                return -1;
524        }
525
526        FORMAT_DATA_FIRST->device = dag_device;
527
528        /* See Config_Status_API_Programming_Guide.pdf from the Endace
529           Dag Documentation */
530        /* Check kBooleanAttributeActive is true -- no point capturing
531         * on an interface that's disabled
532         *
533         * The symptom of the port being disabled is that libtrace
534         * will appear to hang. */
535        /* Check kBooleanAttributeFault is false */
536        /* Check kBooleanAttributeLocalFault is false */
537        /* Check kBooleanAttributeLock is true ? */
538        /* Check kBooleanAttributePeerLink ? */
539
540        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
541           on libtrace promisc attribute?*/
542        /* Set kUint32AttributeSnapLength to the snaplength */
543
544        pthread_mutex_unlock(&open_dag_mutex);
545        return 0;
546}
547
548/* Configures a DAG input trace */
549static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
550                            void *data)
551{
552        char conf_str[4096];
553        switch(option) {
554        case TRACE_OPTION_META_FREQ:
555                /* This option is used to specify the frequency of DUCK
556                 * updates */
557                DUCK.duck_freq = *(int *)data;
558                return 0;
559        case TRACE_OPTION_SNAPLEN:
560                /* Tell the card our new snap length */
561                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
562                if (dag_configure(FORMAT_DATA_FIRST->device->fd,
563                                  conf_str) != 0) {
564                        trace_set_err(libtrace, errno, "Failed to configure "
565                                      "snaplen on DAG card: %s",
566                                      libtrace->uridata);
567                        return -1;
568                }
569                return 0;
570        case TRACE_OPTION_PROMISC:
571                /* DAG already operates in a promisc fashion */
572                return -1;
573        case TRACE_OPTION_FILTER:
574                /* We don't yet support pushing filters into DAG
575                 * cards */
576                return -1;
577        case TRACE_OPTION_EVENT_REALTIME:
578                /* Live capture is always going to be realtime */
579                return -1;
580        }
581        return -1;
582}
583
584/* Starts a DAG output trace */
585static int dag_start_output(libtrace_out_t *libtrace)
586{
587        struct timeval zero, nopoll;
588
589        zero.tv_sec = 0;
590        zero.tv_usec = 0;
591        nopoll = zero;
592
593        /* Attach and start the DAG stream */
594        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
595                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
596                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
597                return -1;
598        }
599
600        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
601                        FORMAT_DATA_OUT->dagstream) < 0) {
602                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
603                return -1;
604        }
605        FORMAT_DATA_OUT->stream_attached = 1;
606
607        /* We don't want the dag card to do any sleeping */
608        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
609                        FORMAT_DATA_OUT->dagstream, 0, &zero,
610                        &nopoll);
611
612        return 0;
613}
614
615/* Starts a DAG input trace */
616static int dag_start_input(libtrace_t *libtrace)
617{
618        struct timeval zero, nopoll;
619        uint8_t *top, *bottom, *starttop;
620        top = bottom = NULL;
621
622        zero.tv_sec = 0;
623        zero.tv_usec = 10000;
624        nopoll = zero;
625
626        /* Attach and start the DAG stream */
627        if (dag_attach_stream(FORMAT_DATA_FIRST->device->fd,
628                              FORMAT_DATA_FIRST->dagstream, 0, 0) < 0) {
629                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
630                return -1;
631        }
632
633        if (dag_start_stream(FORMAT_DATA_FIRST->device->fd,
634                             FORMAT_DATA_FIRST->dagstream) < 0) {
635                trace_set_err(libtrace, errno, "Cannot start DAG stream");
636                return -1;
637        }
638        FORMAT_DATA->stream_attached = 1;
639
640        /* We don't want the dag card to do any sleeping */
641        dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd,
642                            FORMAT_DATA_FIRST->dagstream, 0, &zero,
643                            &nopoll);
644
645        starttop = dag_advance_stream(FORMAT_DATA_FIRST->device->fd,
646                                      FORMAT_DATA_FIRST->dagstream,
647                                      &bottom);
648
649        /* Should probably flush the memory hole now */
650        top = starttop;
651        while (starttop - bottom > 0) {
652                bottom += (starttop - bottom);
653                top = dag_advance_stream(FORMAT_DATA_FIRST->device->fd,
654                                         FORMAT_DATA_FIRST->dagstream,
655                                         &bottom);
656        }
657        FORMAT_DATA_FIRST->top = top;
658        FORMAT_DATA_FIRST->bottom = bottom;
659        FORMAT_DATA_FIRST->processed = 0;
660        FORMAT_DATA_FIRST->drops = 0;
661
662        return 0;
663}
664
665static int dag_pstart_input(libtrace_t *libtrace)
666{
667        char *scan, *tok;
668        uint16_t stream_count = 0, max_streams;
669        int iserror = 0;
670        struct dag_per_stream_t stream_data;
671
672        /* Check we aren't trying to create more threads than the DAG card can
673         * handle */
674        max_streams = dag_rx_get_stream_count(FORMAT_DATA_FIRST->device->fd);
675        if (libtrace->perpkt_thread_count > max_streams) {
676                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
677                              "trying to create too many threads (max is %u)",
678                              max_streams);
679                iserror = 1;
680                goto cleanup;
681        }
682
683        /* Get the stream names from the uri */
684        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
685                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
686                              "format uri doesn't specify the DAG streams");
687                iserror = 1;
688                goto cleanup;
689        }
690
691        scan++;
692
693        tok = strtok(scan, ",");
694        while (tok != NULL) {
695                /* Ensure we haven't specified too many streams */
696                if (stream_count >= libtrace->perpkt_thread_count) {
697                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
698                                      "format uri specifies too many streams. "
699                                      "Max is %u", max_streams);
700                        iserror = 1;
701                        goto cleanup;
702                }
703
704                /* Save the stream details */
705                if (stream_count == 0) {
706                        /* Special case where we update the existing stream
707                         * data structure */
708                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
709                } else {
710                        memset(&stream_data, 0, sizeof(stream_data));
711                        stream_data.device = FORMAT_DATA_FIRST->device;
712                        stream_data.dagstream = (uint16_t)atoi(tok);
713                        libtrace_list_push_back(FORMAT_DATA->per_stream,
714                                                &stream_data);
715                }
716
717                stream_count++;
718                tok = strtok(NULL, ",");
719        }
720
721        FORMAT_DATA->stream_attached = 1;
722
723 cleanup:
724        if (iserror) {
725                return -1;
726        } else {
727                return 0;
728        }
729}
730
731/* Pauses a DAG output trace */
732static int dag_pause_output(libtrace_out_t *libtrace)
733{
734        /* Stop and detach the stream */
735        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
736                            FORMAT_DATA_OUT->dagstream) < 0) {
737                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
738                return -1;
739        }
740        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
741                              FORMAT_DATA_OUT->dagstream) < 0) {
742                trace_set_err_out(libtrace, errno,
743                                  "Could not detach DAG stream");
744                return -1;
745        }
746        FORMAT_DATA_OUT->stream_attached = 0;
747        return 0;
748}
749
750/* Pauses a DAG input trace */
751static int dag_pause_input(libtrace_t *libtrace)
752{
753        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
754
755        /* Stop and detach each stream */
756        while (tmp != NULL) {
757                if (dag_stop_stream(STREAM_DATA(tmp)->device->fd,
758                                    STREAM_DATA(tmp)->dagstream) < 0) {
759                        trace_set_err(libtrace, errno,
760                                      "Could not stop DAG stream");
761                        printf("Count not stop DAG stream\n");
762                        return -1;
763                }
764                if (dag_detach_stream(STREAM_DATA(tmp)->device->fd,
765                                      STREAM_DATA(tmp)->dagstream) < 0) {
766                        trace_set_err(libtrace, errno,
767                                      "Could not detach DAG stream");
768                        printf("Count not detach DAG stream\n");
769                        return -1;
770                }
771
772                tmp = tmp->next;
773        }
774
775        FORMAT_DATA->stream_attached = 0;
776        return 0;
777}
778
779
780
781/* Closes a DAG input trace */
782static int dag_fin_input(libtrace_t *libtrace)
783{
784        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
785
786        /* Need the lock, since we're going to be handling the device list */
787        pthread_mutex_lock(&open_dag_mutex);
788
789        /* Detach the stream if we are not paused */
790        if (FORMAT_DATA->stream_attached)
791                dag_pause_input(libtrace);
792
793        /* Close any dag devices that have no more references */
794        while (tmp != NULL) {
795                STREAM_DATA(tmp)->device->ref_count--;
796                if (STREAM_DATA(tmp)->device->ref_count == 0)
797                        dag_close_device(STREAM_DATA(tmp)->device);
798
799                tmp = tmp->next;
800        }
801
802        if (DUCK.dummy_duck)
803                trace_destroy_dead(DUCK.dummy_duck);
804
805        /* Clear the list */
806        libtrace_list_deinit(FORMAT_DATA->per_stream);
807
808        free(libtrace->format_data);
809        pthread_mutex_unlock(&open_dag_mutex);
810        return 0; /* success */
811}
812
813/* Closes a DAG output trace */
814static int dag_fin_output(libtrace_out_t *libtrace)
815{
816
817        /* Commit any outstanding traffic in the txbuffer */
818        if (FORMAT_DATA_OUT->waiting) {
819                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
820                                           FORMAT_DATA_OUT->dagstream,
821                                           FORMAT_DATA_OUT->waiting );
822        }
823
824        /* Wait until the buffer is nearly clear before exiting the program,
825         * as we will lose packets otherwise */
826        dag_tx_get_stream_space
827                (FORMAT_DATA_OUT->device->fd,
828                 FORMAT_DATA_OUT->dagstream,
829                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
830                                            FORMAT_DATA_OUT->dagstream) - 8);
831
832        /* Need the lock, since we're going to be handling the device list */
833        pthread_mutex_lock(&open_dag_mutex);
834
835        /* Detach the stream if we are not paused */
836        if (FORMAT_DATA_OUT->stream_attached)
837                dag_pause_output(libtrace);
838        FORMAT_DATA_OUT->device->ref_count --;
839
840        /* Close the DAG device if there are no more references to it */
841        if (FORMAT_DATA_OUT->device->ref_count == 0)
842                dag_close_device(FORMAT_DATA_OUT->device);
843        free(libtrace->format_data);
844        pthread_mutex_unlock(&open_dag_mutex);
845        return 0; /* success */
846}
847
848/* DUCK reporting is broken at the moment! */
849#if 0
850/* Extracts DUCK information from the DAG card and produces a DUCK packet */
851static int dag_get_duckinfo(libtrace_t *libtrace, libtrace_packet_t *packet)
852{
853        /* Allocate memory for the DUCK data */
854        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
855            !packet->buffer) {
856                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
857                packet->buf_control = TRACE_CTRL_PACKET;
858                if (!packet->buffer) {
859                        trace_set_err(libtrace, errno,
860                                      "Cannot allocate packet buffer");
861                        return -1;
862                }
863        }
864
865        /* DUCK doesn't have a format header */
866        packet->header = 0;
867        packet->payload = packet->buffer;
868
869        /* No need to check if we can get DUCK or not - we're modern
870         * enough so just grab the DUCK info */
871        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
872                   (duckinf_t *)packet->payload) < 0)) {
873                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
874                return -1;
875        }
876
877        packet->type = TRACE_RT_DUCK_2_5;
878
879        /* Set the packet's tracce to point at a DUCK trace, so that the
880         * DUCK format functions will be called on the packet rather than the
881         * DAG ones */
882        if (!DUCK.dummy_duck)
883                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
884        packet->trace = DUCK.dummy_duck;
885        return sizeof(duckinf_t);
886
887        return 0;
888}
889#endif
890
891/* Determines the amount of data available to read from the DAG card */
892static int dag_available(libtrace_t *libtrace,
893                         struct dag_per_stream_t *stream_data)
894{
895        uint32_t diff = stream_data->top - stream_data->bottom;
896
897        /* If we've processed more than 4MB of data since we last called
898         * dag_advance_stream, then we should call it again to allow the
899         * space occupied by that 4MB to be released */
900        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
901                return diff;
902
903        /* Update the top and bottom pointers */
904        stream_data->top = dag_advance_stream(stream_data->device->fd,
905                                              stream_data->dagstream,
906                                              &(stream_data->bottom));
907
908        if (stream_data->top == NULL) {
909                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
910                return -1;
911        }
912        stream_data->processed = 0;
913        diff = stream_data->top - stream_data->bottom;
914        return diff;
915}
916
917/* Returns a pointer to the start of the next complete ERF record */
918static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
919{
920        dag_record_t *erfptr = NULL;
921        uint16_t size;
922
923        erfptr = (dag_record_t *)stream_data->bottom;
924        if (!erfptr)
925                return NULL;
926
927        size = ntohs(erfptr->rlen);
928        assert( size >= dag_record_size );
929
930        /* Make certain we have the full packet available */
931        if (size > (stream_data->top - stream_data->bottom))
932                return NULL;
933
934        stream_data->bottom += size;
935        stream_data->processed += size;
936        return erfptr;
937}
938
939/* Converts a buffer containing a recently read DAG packet record into a
940 * libtrace packet */
941static int dag_prepare_packet_real(libtrace_t *libtrace,
942                                   struct dag_per_stream_t *stream_data,
943                                   libtrace_packet_t *packet,
944                                   void *buffer, libtrace_rt_types_t rt_type,
945                                   uint32_t flags)
946{
947        dag_record_t *erfptr;
948
949        /* If the packet previously owned a buffer that is not the buffer
950         * that contains the new packet data, we're going to need to free the
951         * old one to avoid memory leaks */
952        if (packet->buffer != buffer &&
953            packet->buf_control == TRACE_CTRL_PACKET) {
954                free(packet->buffer);
955        }
956
957        /* Set the buffer owner appropriately */
958        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
959                packet->buf_control = TRACE_CTRL_PACKET;
960        } else
961                packet->buf_control = TRACE_CTRL_EXTERNAL;
962
963        /* Update the packet pointers and type appropriately */
964        erfptr = (dag_record_t *)buffer;
965        packet->buffer = erfptr;
966        packet->header = erfptr;
967        packet->type = rt_type;
968
969        if (erfptr->flags.rxerror == 1) {
970                /* rxerror means the payload is corrupt - drop the payload
971                 * by tweaking rlen */
972                packet->payload = NULL;
973                erfptr->rlen = htons(erf_get_framing_length(packet));
974        } else {
975                packet->payload = (char*)packet->buffer
976                        + erf_get_framing_length(packet);
977        }
978
979        if (libtrace->format_data == NULL) {
980                dag_init_format_data(libtrace);
981        }
982
983        /* Update the dropped packets counter */
984        /* No loss counter for DSM coloured records - have to use some
985         * other API */
986        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
987                /* TODO */
988        } else {
989                /* Use the ERF loss counter */
990                if (stream_data->seeninterface[erfptr->flags.iface]
991                    == 0) {
992                        stream_data->seeninterface[erfptr->flags.iface]
993                                = 1;
994                } else {
995                        stream_data->drops += ntohs(erfptr->lctr);
996                }
997        }
998
999        packet->error = 1;
1000
1001        return 0;
1002}
1003
1004static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1005                              void *buffer, libtrace_rt_types_t rt_type,
1006                              uint32_t flags)
1007{
1008        return dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
1009                                       buffer, rt_type, flags);
1010}
1011
1012/*
1013 * dag_write_packet() at this stage attempts to improve tx performance
1014 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1015 * has been met. I observed approximately 270% performance increase
1016 * through this relatively naive tweak. No optimisation of buffer sizes
1017 * was attempted.
1018 */
1019
1020/* Pushes an ERF record onto the transmit stream */
1021static int dag_dump_packet(libtrace_out_t *libtrace,
1022                           dag_record_t *erfptr, unsigned int pad,
1023                           void *buffer)
1024{
1025        int size;
1026
1027        /*
1028         * If we've got 0 bytes waiting in the txqueue, assume that we
1029         * haven't requested any space yet, and request some, storing
1030         * the pointer at FORMAT_DATA_OUT->txbuffer.
1031         *
1032         * The amount to request is slightly magical at the moment - it's
1033         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1034         * the buffer and handle overruns.
1035         */
1036        if (FORMAT_DATA_OUT->waiting == 0) {
1037                FORMAT_DATA_OUT->txbuffer =
1038                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
1039                                                FORMAT_DATA_OUT->dagstream,
1040                                                16908288);
1041        }
1042
1043        /*
1044         * Copy the header separately to the body, as we can't guarantee they
1045         * are in contiguous memory
1046         */
1047        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1048               (dag_record_size + pad));
1049        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1050
1051        /*
1052         * Copy our incoming packet into the outgoing buffer, and increment
1053         * our waiting count
1054         */
1055        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1056        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1057               size);
1058        FORMAT_DATA_OUT->waiting += size;
1059
1060        /*
1061         * If our output buffer has more than 16 Mebibytes in it, commit those
1062         * bytes and reset the waiting count to 0.
1063         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1064         * case there is still data in the buffer at program exit.
1065         */
1066        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1067                FORMAT_DATA_OUT->txbuffer =
1068                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1069                                                   FORMAT_DATA_OUT->dagstream,
1070                                                   FORMAT_DATA_OUT->waiting);
1071                FORMAT_DATA_OUT->waiting = 0;
1072        }
1073
1074        return size + pad + dag_record_size;
1075}
1076
1077/* Attempts to determine a suitable ERF type for a given packet. Returns true
1078 * if one is found, false otherwise */
1079static bool find_compatible_linktype(libtrace_out_t *libtrace,
1080                                     libtrace_packet_t *packet, char *type)
1081{
1082        /* Keep trying to simplify the packet until we can find
1083         * something we can do with it */
1084
1085        do {
1086                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1087
1088                /* Success */
1089                if (*type != (char)-1)
1090                        return true;
1091
1092                if (!demote_packet(packet)) {
1093                        trace_set_err_out(libtrace,
1094                                          TRACE_ERR_NO_CONVERSION,
1095                                          "No erf type for packet (%i)",
1096                                          trace_get_link_type(packet));
1097                        return false;
1098                }
1099
1100        } while(1);
1101
1102        return true;
1103}
1104
1105/* Writes a packet to the provided DAG output trace */
1106static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1107{
1108        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1109         * coding sucks, sorry about that.
1110         */
1111        unsigned int pad = 0;
1112        int numbytes;
1113        void *payload = packet->payload;
1114        dag_record_t *header = (dag_record_t *)packet->header;
1115        char erf_type = 0;
1116
1117        if(!packet->header) {
1118                /* No header, probably an RT packet. Lifted from
1119                 * erf_write_packet(). */
1120                return -1;
1121        }
1122
1123        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1124                return 0;
1125
1126        pad = dag_get_padding(packet);
1127
1128        /*
1129         * If the payload is null, adjust the rlen. Discussion of this is
1130         * attached to erf_write_packet()
1131         */
1132        if (payload == NULL) {
1133                header->rlen = htons(dag_record_size + pad);
1134        }
1135
1136        if (packet->type == TRACE_RT_DATA_ERF) {
1137                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1138        } else {
1139                /* Build up a new packet header from the existing header */
1140
1141                /* Simplify the packet first - if we can't do this, break
1142                 * early */
1143                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1144                        return -1;
1145
1146                dag_record_t erfhdr;
1147
1148                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1149                payload=packet->payload;
1150                pad = dag_get_padding(packet);
1151
1152                /* Flags. Can't do this */
1153                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1154                if (trace_get_direction(packet)!=(int)~0U)
1155                        erfhdr.flags.iface = trace_get_direction(packet);
1156
1157                erfhdr.type = erf_type;
1158
1159                /* Packet length (rlen includes format overhead) */
1160                assert(trace_get_capture_length(packet) > 0
1161                       && trace_get_capture_length(packet) <= 65536);
1162                assert(erf_get_framing_length(packet) > 0
1163                       && trace_get_framing_length(packet) <= 65536);
1164                assert(trace_get_capture_length(packet) +
1165                       erf_get_framing_length(packet) > 0
1166                       && trace_get_capture_length(packet) +
1167                       erf_get_framing_length(packet) <= 65536);
1168
1169                erfhdr.rlen = htons(trace_get_capture_length(packet)
1170                                    + erf_get_framing_length(packet));
1171
1172
1173                /* Loss counter. Can't do this */
1174                erfhdr.lctr = 0;
1175                /* Wire length, does not include padding! */
1176                erfhdr.wlen = htons(trace_get_wire_length(packet));
1177
1178                /* Write it out */
1179                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1180        }
1181
1182        return numbytes;
1183}
1184
1185/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1186 *
1187 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1188 */
1189static int dag_read_packet_real(libtrace_t *libtrace,
1190                                struct dag_per_stream_t *stream_data,
1191                                libtrace_thread_t *t, /* Optional */
1192                                libtrace_packet_t *packet)
1193{
1194        dag_record_t *erfptr = NULL;
1195        int numbytes = 0;
1196        uint32_t flags = 0;
1197        struct timeval maxwait, pollwait;
1198
1199        pollwait.tv_sec = 0;
1200        pollwait.tv_usec = 10000;
1201        maxwait.tv_sec = 0;
1202        maxwait.tv_usec = 250000;
1203
1204        /* TODO: Support DUCK reporting */
1205
1206        /* Don't let anyone try to free our DAG memory hole! */
1207        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1208
1209        /* If the packet buffer is currently owned by libtrace, free it so
1210         * that we can set the packet to point into the DAG memory hole */
1211        if (packet->buf_control == TRACE_CTRL_PACKET) {
1212                free(packet->buffer);
1213                packet->buffer = 0;
1214        }
1215
1216        if (dag_set_stream_poll(stream_data->device->fd, stream_data->dagstream,
1217                                sizeof(dag_record_t), &maxwait,
1218                                &pollwait) == -1) {
1219                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1220                return -1;
1221        }
1222
1223        /* Grab a full ERF record */
1224        do {
1225                numbytes = dag_available(libtrace, stream_data);
1226                if (numbytes < 0)
1227                        return numbytes;
1228                if (numbytes < dag_record_size) {
1229                        /* Check the message queue if we have one to check */
1230                        if (t != NULL &&
1231                            libtrace_message_queue_count(&t->messages) > 0)
1232                                return -2;
1233
1234                        if (libtrace_halt)
1235                                return 0;
1236                        /* Block until we see a packet */
1237                        continue;
1238                }
1239                erfptr = dag_get_record(stream_data);
1240        } while (erfptr == NULL);
1241
1242        /* Prepare the libtrace packet */
1243        if (dag_prepare_packet_real(libtrace, stream_data, packet, erfptr,
1244                                    TRACE_RT_DATA_ERF, flags))
1245                return -1;
1246
1247        return packet->payload ? htons(erfptr->rlen) :
1248                erf_get_framing_length(packet);
1249}
1250
1251static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1252{
1253        return dag_read_packet_real(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1254}
1255
1256static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1257                             libtrace_packet_t **packets, size_t nb_packets)
1258{
1259        int ret;
1260        size_t read_packets = 0;
1261        int numbytes = 0;
1262
1263        struct dag_per_stream_t *stream_data =
1264                (struct dag_per_stream_t *)t->format_data;
1265
1266        /* Read as many packets as we can, but read atleast one packet */
1267        do {
1268                ret = dag_read_packet_real(libtrace, stream_data, t,
1269                                           packets[read_packets]);
1270                if (ret < 0)
1271                        return ret;
1272
1273                read_packets++;
1274
1275                /* Make sure we don't read too many packets..! */
1276                if (read_packets >= nb_packets)
1277                        break;
1278
1279                numbytes = dag_available(libtrace, stream_data);
1280        } while (numbytes >= dag_record_size);
1281
1282        return read_packets;
1283}
1284
1285/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1286 * packet is available, we will return a packet event. Otherwise we will
1287 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1288 */
1289static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1290                                           libtrace_packet_t *packet)
1291{
1292        libtrace_eventobj_t event = {0,0,0.0,0};
1293        dag_record_t *erfptr = NULL;
1294        int numbytes;
1295        uint32_t flags = 0;
1296        struct timeval minwait;
1297
1298        minwait.tv_sec = 0;
1299        minwait.tv_usec = 10000;
1300
1301        if (dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd,
1302                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1303                                &minwait) == -1) {
1304                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1305                event.type = TRACE_EVENT_TERMINATE;
1306                return event;
1307        }
1308
1309        do {
1310                erfptr = NULL;
1311                numbytes = 0;
1312
1313                /* Need to call dag_available so that the top pointer will get
1314                 * updated, otherwise we'll never see any data! */
1315                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1316
1317                /* May as well not bother calling dag_get_record if
1318                 * dag_available suggests that there's no data */
1319                if (numbytes != 0)
1320                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1321                if (erfptr == NULL) {
1322                        /* No packet available - sleep for a very short time */
1323                        if (libtrace_halt) {
1324                                event.type = TRACE_EVENT_TERMINATE;
1325                        } else {
1326                                event.type = TRACE_EVENT_SLEEP;
1327                                event.seconds = 0.0001;
1328                        }
1329                        break;
1330                }
1331                if (dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
1332                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1333                        event.type = TRACE_EVENT_TERMINATE;
1334                        break;
1335                }
1336
1337
1338                event.size = trace_get_capture_length(packet) +
1339                        trace_get_framing_length(packet);
1340
1341                /* XXX trace_read_packet() normally applies the following
1342                 * config options for us, but this function is called via
1343                 * trace_event() so we have to do it ourselves */
1344
1345                if (libtrace->filter) {
1346                        int filtret = trace_apply_filter(libtrace->filter,
1347                                                         packet);
1348                        if (filtret == -1) {
1349                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1350                                              "Bad BPF Filter");
1351                                event.type = TRACE_EVENT_TERMINATE;
1352                                break;
1353                        }
1354
1355                        if (filtret == 0) {
1356                                /* This packet isn't useful so we want to
1357                                 * immediately see if there is another suitable
1358                                 * one - we definitely DO NOT want to return
1359                                 * a sleep event in this case, like we used to
1360                                 * do! */
1361                                libtrace->filtered_packets ++;
1362                                trace_clear_cache(packet);
1363                                continue;
1364                        }
1365
1366                        event.type = TRACE_EVENT_PACKET;
1367                } else {
1368                        event.type = TRACE_EVENT_PACKET;
1369                }
1370
1371                if (libtrace->snaplen > 0) {
1372                        trace_set_capture_length(packet, libtrace->snaplen);
1373                }
1374                libtrace->accepted_packets ++;
1375                break;
1376        } while(1);
1377
1378        return event;
1379}
1380
1381/* Gets the number of dropped packets */
1382static uint64_t dag_get_dropped_packets(libtrace_t *libtrace)
1383{
1384        uint64_t sum = 0;
1385        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
1386
1387        /* Sum the drop counter for all the packets */
1388        while (tmp != NULL) {
1389                sum += STREAM_DATA(tmp)->drops;
1390                tmp = tmp->next;
1391        }
1392
1393        return sum;
1394}
1395
1396/* Prints some semi-useful help text about the DAG format module */
1397static void dag_help(void) {
1398        printf("dag format module: $Revision: 1755 $\n");
1399        printf("Supported input URIs:\n");
1400        printf("\tdag:/dev/dagn\n");
1401        printf("\n");
1402        printf("\te.g.: dag:/dev/dag0\n");
1403        printf("\n");
1404        printf("Supported output URIs:\n");
1405        printf("\tnone\n");
1406        printf("\n");
1407}
1408
1409static int dag_pconfig_input(UNUSED libtrace_t *libtrace,
1410                             trace_parallel_option_t option, UNUSED void *value)
1411{
1412        /* We don't support any of these! Normally you configure the DAG card
1413         * externally. */
1414        switch(option) {
1415        case TRACE_OPTION_SET_HASHER:
1416        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1417        case TRACE_OPTION_TRACETIME:
1418        case TRACE_OPTION_TICK_INTERVAL:
1419        case TRACE_OPTION_GET_CONFIG:
1420        case TRACE_OPTION_SET_CONFIG:
1421                return -1;
1422        }
1423        /* We don't provide a default option to ensure that future options will
1424         * generate a compiler warning. */
1425
1426        return -1;
1427}
1428
1429/* TODO: Should possibly make a more generic dag_start_input, as there's a
1430 * fair bit of code duplication between that and this */
1431static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1432                                bool reader)
1433{
1434        struct timeval zero, nopoll;
1435        uint8_t *top, *bottom, *starttop;
1436        struct dag_per_stream_t *stream_data;
1437        top = bottom = NULL;
1438
1439        /* Minimum delay is 10mS */
1440        zero.tv_sec = 0;
1441        zero.tv_usec = 10000;
1442        nopoll = zero;
1443
1444        if (reader) {
1445                if (t->type == THREAD_PERPKT) {
1446                        stream_data =
1447                                (struct dag_per_stream_t *)
1448                                libtrace_list_get_index(FORMAT_DATA->per_stream,
1449                                                        t->perpkt_num)->data;
1450
1451                        /* Pass the per thread data to the thread */
1452                        t->format_data = stream_data;
1453
1454                        /* Attach and start the DAG stream */
1455                        printf("t%u: starting and attaching stream #%u\n",
1456                               t->perpkt_num, stream_data->dagstream);
1457                        if (dag_attach_stream(stream_data->device->fd,
1458                                              stream_data->dagstream, 0,
1459                                              0) < 0) {
1460                                printf("can't attach DAG stream #%u\n",
1461                                       stream_data->dagstream);
1462                                trace_set_err(libtrace, errno,
1463                                              "can't attach DAG stream #%u",
1464                                              stream_data->dagstream);
1465                                return -1;
1466                        }
1467                        if (dag_start_stream(stream_data->device->fd,
1468                                             stream_data->dagstream) < 0) {
1469                                trace_set_err(libtrace, errno,
1470                                              "can't start DAG stream #%u",
1471                                              stream_data->dagstream);
1472                                printf("can't start DAG stream #%u\n",
1473                                       stream_data->dagstream);
1474                                return -1;
1475                        }
1476
1477                        /* Ensure that dag_advance_stream will return without
1478                         * blocking */
1479                        if(dag_set_stream_poll(stream_data->device->fd,
1480                                               stream_data->dagstream, 0, &zero,
1481                                               &nopoll) < 0) {
1482                                trace_set_err(libtrace, errno,
1483                                              "dag_set_stream_poll failed!");
1484                                return -1;
1485                        }
1486
1487                        /* Clear all the data from the memory hole */
1488                        starttop = dag_advance_stream(stream_data->
1489                                                      device->fd,
1490                                                      stream_data->dagstream,
1491                                                      &bottom);
1492
1493                        top = starttop;
1494                        while (starttop - bottom > 0) {
1495                                bottom += (starttop - bottom);
1496                                top = dag_advance_stream(stream_data->
1497                                                         device->fd,
1498                                                         stream_data->dagstream,
1499                                                         &bottom);
1500                        }
1501                        stream_data->top = top;
1502                        stream_data->bottom = bottom;
1503                        stream_data->pkt_count = 0;
1504                        stream_data->drops = 0;
1505                } else {
1506                        /* TODO: Figure out why t->type != THREAD_PERPKT in
1507                         * order to figure out what this line does */
1508                        t->format_data = FORMAT_DATA_FIRST;
1509                }
1510        }
1511
1512        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
1513
1514        return 0;
1515}
1516
1517static struct libtrace_format_t dag = {
1518        "dag",
1519        "$Id$",
1520        TRACE_FORMAT_ERF,
1521        dag_probe_filename,             /* probe filename */
1522        NULL,                           /* probe magic */
1523        dag_init_input,                 /* init_input */
1524        dag_config_input,               /* config_input */
1525        dag_start_input,                /* start_input */
1526        dag_pause_input,                /* pause_input */
1527        dag_init_output,                /* init_output */
1528        NULL,                           /* config_output */
1529        dag_start_output,               /* start_output */
1530        dag_fin_input,                  /* fin_input */
1531        dag_fin_output,                 /* fin_output */
1532        dag_read_packet,                /* read_packet */
1533        dag_prepare_packet,             /* prepare_packet */
1534        NULL,                           /* fin_packet */
1535        dag_write_packet,               /* write_packet */
1536        erf_get_link_type,              /* get_link_type */
1537        erf_get_direction,              /* get_direction */
1538        erf_set_direction,              /* set_direction */
1539        erf_get_erf_timestamp,          /* get_erf_timestamp */
1540        NULL,                           /* get_timeval */
1541        NULL,                           /* get_seconds */
1542        NULL,                           /* get_timespec */
1543        NULL,                           /* seek_erf */
1544        NULL,                           /* seek_timeval */
1545        NULL,                           /* seek_seconds */
1546        erf_get_capture_length,         /* get_capture_length */
1547        erf_get_wire_length,            /* get_wire_length */
1548        erf_get_framing_length,         /* get_framing_length */
1549        erf_set_capture_length,         /* set_capture_length */
1550        NULL,                           /* get_received_packets */
1551        NULL,                           /* get_filtered_packets */
1552        dag_get_dropped_packets,        /* get_dropped_packets */
1553        NULL,                           /* get_captured_packets */
1554        NULL,                           /* get_fd */
1555        trace_event_dag,                /* trace_event */
1556        dag_help,                       /* help */
1557        NULL,                            /* next pointer */
1558        {true, 0}, /* live packet capture, thread limit TBD */
1559        dag_pstart_input,
1560        dag_pread_packets,
1561        dag_pause_input,
1562        NULL,
1563        dag_pconfig_input,
1564        dag_pregister_thread,
1565        NULL
1566};
1567
1568void dag_constructor(void)
1569{
1570        register_format(&dag);
1571}
Note: See TracBrowser for help on using the repository browser.