source: lib/format_dag25.c @ fce4572

develop
Last change on this file since fce4572 was fce4572, checked in by Shane Alcock <salcock@…>, 22 months ago

Add more failure cases to "can_write" functions for some formats.

Specifically:

  • Avoid writing metadata packets to DPDK, ring and int
  • Make sure all formats avoid writing "content invalid" packets.
  • Add comments to remind us that erf meta <-> pcapng meta conversion might be worth adding at some point.
  • Add comment to remind us that erf meta should be writable via a DAG card (I think).
  • Property mode set to 100644
File size: 47.4 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#define _GNU_SOURCE
27
28#include "config.h"
29#include "common.h"
30#include "libtrace.h"
31#include "libtrace_int.h"
32#include "format_helper.h"
33#include "format_erf.h"
34
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <string.h>
39#include <stdlib.h>
40#include <sys/stat.h>
41
42#include <sys/mman.h>
43/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
44 * Windows anyway so we'll worry about this more later :] */
45#include <pthread.h>
46
47
48#ifdef HAVE_DAG_CONFIG_API_H
49#include <dag_config_api.h>
50#endif
51
52#ifdef WIN32
53#  include <io.h>
54#  include <share.h>
55#  define PATH_MAX _MAX_PATH
56#  define snprintf sprintf_s
57#else
58#  include <netdb.h>
59#  ifndef PATH_MAX
60#       define PATH_MAX 4096
61#  endif
62#  include <sys/ioctl.h>
63#endif
64
65/* This format deals with DAG cards that are using drivers from the 2.5 version
66 * onwards, including 3.X.
67 *
68 * DAG is a LIVE capture format.
69 *
70 * This format does support writing, provided the DAG card that you are using
71 * has transmit (Tx) support. Additionally, packets read using this format
72 * are in the ERF format, so can easily be written as ERF traces without
73 * losing any data.
74 */
75
76#define DATA(x) ((struct dag_format_data_t *)x->format_data)
77#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
78#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
79
80#define FORMAT_DATA DATA(libtrace)
81#define FORMAT_DATA_OUT DATA_OUT(libtrace)
82
83#define DUCK FORMAT_DATA->duck
84
85#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
86#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
87
88static struct libtrace_format_t dag;
89
90/* A DAG device - a DAG device can support multiple streams (and therefore
91 * multiple input traces) so each trace needs to refer to a device */
92struct dag_dev_t {
93        char * dev_name;                /* Device name */
94        int fd;                         /* File descriptor */
95        uint16_t ref_count;             /* Number of input / output traces
96                                           that are using this device */
97        struct dag_dev_t *prev;         /* Pointer to the previous device in
98                                           the device list */
99        struct dag_dev_t *next;         /* Pointer to the next device in the
100                                           device list */
101};
102
103/* "Global" data that is stored for each DAG output trace */
104struct dag_format_data_out_t {
105        /* The DAG device being used for writing */
106        struct dag_dev_t *device;
107        /* The DAG stream that is being written on */
108        unsigned int dagstream;
109        /* Boolean flag indicating whether the stream is currently attached */
110        int stream_attached;
111        /* The amount of data waiting to be transmitted, in bytes */
112        uint64_t waiting;
113        /* A buffer to hold the data to be transmittted */
114        uint8_t *txbuffer;
115};
116
117/* Data that is stored against each input stream */
118struct dag_per_stream_t {
119        /* DAG stream number */
120        uint16_t dagstream;
121        /* Pointer to the last unread byte in the DAG memory */
122        uint8_t *top;
123        /* Pointer to the first unread byte in the DAG memory */
124        uint8_t *bottom;
125        /* Amount of data processed from the bottom pointer */
126        uint32_t processed;
127        /* Number of packets seen by the stream */
128        uint64_t pkt_count;
129        /* Drop count for this particular stream */
130        uint64_t drops;
131        /* Boolean values to indicate if a particular interface has been seen
132         * or not. This is limited to four interfaces, which is enough to
133         * support all current DAG cards */
134        uint8_t seeninterface[4];
135};
136
137/* "Global" data that is stored for each DAG input trace */
138struct dag_format_data_t {
139        /* DAG device */
140        struct dag_dev_t *device;
141        /* Boolean flag indicating whether the trace is currently attached */
142        int stream_attached;
143        /* Data stored against each DAG input stream */
144        libtrace_list_t *per_stream;
145
146        /* Data required for regular DUCK reporting.
147         * We put this on a new cache line otherwise we have a lot of false
148         * sharing caused by updating the last_pkt.
149         * This should only ever be accessed by the first thread stream,
150         * that includes both read and write operations.
151         */
152        struct {
153                /* Timestamp of the last DUCK report */
154                uint32_t last_duck;
155                /* The number of seconds between each DUCK report */
156                uint32_t duck_freq;
157                /* Timestamp of the last packet read from the DAG card */
158                uint32_t last_pkt;
159                /* Dummy trace to ensure DUCK packets are dealt with using the
160                 * DUCK format functions */
161                libtrace_t *dummy_duck;
162        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
163};
164
165/* To be thread-safe, we're going to need a mutex for operating on the list
166 * of DAG devices */
167pthread_mutex_t open_dag_mutex;
168
169/* The list of DAG devices that have been opened by libtrace.
170 *
171 * We can only open each DAG device once, but we might want to read from
172 * multiple streams. Therefore, we need to maintain a list of devices that we
173 * have opened (with ref counts!) so that we don't try to open a device too
174 * many times or close a device that we're still using */
175struct dag_dev_t *open_dags = NULL;
176
177static bool dag_can_write(libtrace_packet_t *packet) {
178        /* Get the linktype */
179        libtrace_linktype_t ltype = trace_get_link_type(packet);
180
181        if (ltype == TRACE_TYPE_CONTENT_INVALID) {
182                return false;
183        }
184
185        /* TODO erf meta should definitely be writable, pcapng meta
186         * could probably be converted into erf meta */
187        if (ltype == TRACE_TYPE_ERF_META
188                        || ltype == TRACE_TYPE_NONDATA
189                        || ltype == TRACE_TYPE_PCAPNG_META) {
190                return false;
191        }
192
193        return true;
194}
195
196/* Returns the amount of padding between the ERF header and the start of the
197 * captured packet data */
198static int dag_get_padding(const libtrace_packet_t *packet)
199{
200        /* ERF Ethernet records have a 2 byte padding before the packet itself
201         * so that the IP header is aligned on a 32 bit boundary.
202         */
203        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
204                dag_record_t *erfptr = (dag_record_t *)packet->header;
205                switch(erfptr->type) {
206                        case TYPE_ETH:
207                        case TYPE_COLOR_ETH:
208                        case TYPE_DSM_COLOR_ETH:
209                        case TYPE_COLOR_HASH_ETH:
210                                return 2;
211                        default:                return 0;
212                }
213        }
214        else {
215                switch(trace_get_link_type(packet)) {
216                        case TRACE_TYPE_ETH:    return 2;
217                        default:                return 0;
218                }
219        }
220}
221
222/* Attempts to determine if the given filename refers to a DAG device */
223static int dag_probe_filename(const char *filename)
224{
225        struct stat statbuf;
226        /* Can we stat the file? */
227        if (stat(filename, &statbuf) != 0) {
228                return 0;
229        }
230        /* Is it a character device? */
231        if (!S_ISCHR(statbuf.st_mode)) {
232                return 0;
233        }
234        /* Yeah, it's probably us. */
235        return 1;
236}
237
238/* Initialises the DAG output data structure */
239static void dag_init_format_out_data(libtrace_out_t *libtrace)
240{
241        libtrace->format_data = (struct dag_format_data_out_t *)
242                malloc(sizeof(struct dag_format_data_out_t));
243        // no DUCK on output
244        FORMAT_DATA_OUT->stream_attached = 0;
245        FORMAT_DATA_OUT->device = NULL;
246        FORMAT_DATA_OUT->dagstream = 0;
247        FORMAT_DATA_OUT->waiting = 0;
248
249}
250
251/* Initialises the DAG input data structure */
252static void dag_init_format_data(libtrace_t *libtrace)
253{
254        struct dag_per_stream_t stream_data;
255
256        libtrace->format_data = (struct dag_format_data_t *)
257                malloc(sizeof(struct dag_format_data_t));
258        DUCK.last_duck = 0;
259        DUCK.duck_freq = 0;
260        DUCK.last_pkt = 0;
261        DUCK.dummy_duck = NULL;
262
263        FORMAT_DATA->per_stream =
264                libtrace_list_init(sizeof(stream_data));
265        if (FORMAT_DATA->per_stream == NULL) {
266                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
267                        "Unable to create list for stream in dag_init_format_data()");
268                return;
269        }
270
271        /* We'll start with just one instance of stream_data, and we'll
272         * add more later if we need them */
273        memset(&stream_data, 0, sizeof(stream_data));
274        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
275}
276
277/* Determines if there is already an entry for the given DAG device in the
278 * device list and increments the reference count for that device, if found.
279 *
280 * NOTE: This function assumes the open_dag_mutex is held by the caller */
281static struct dag_dev_t *dag_find_open_device(char *dev_name)
282{
283        struct dag_dev_t *dag_dev;
284
285        dag_dev = open_dags;
286
287        /* XXX: Not exactly zippy, but how often are we going to be dealing
288         * with multiple dag cards? */
289        while (dag_dev != NULL) {
290                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
291                        dag_dev->ref_count ++;
292                        return dag_dev;
293                }
294                dag_dev = dag_dev->next;
295        }
296        return NULL;
297}
298
299/* Closes a DAG device and removes it from the device list.
300 *
301 * Attempting to close a DAG device that has a non-zero reference count will
302 * cause an assertion failure!
303 *
304 * NOTE: This function assumes the open_dag_mutex is held by the caller */
305static void dag_close_device(struct dag_dev_t *dev)
306{
307        /* Need to remove from the device list */
308        if (dev->ref_count != 0) {
309                fprintf(stderr, "Cannot close DAG device with non zero reference in dag_close_device()\n");
310                return;
311        }
312
313        if (dev->prev == NULL) {
314                open_dags = dev->next;
315                if (dev->next)
316                        dev->next->prev = NULL;
317        } else {
318                dev->prev->next = dev->next;
319                if (dev->next)
320                        dev->next->prev = dev->prev;
321        }
322
323        dag_close(dev->fd);
324        free(dev);
325}
326
327
328/* Opens a new DAG device for writing and adds it to the DAG device list
329 *
330 * NOTE: this function should only be called when opening a DAG device for
331 * writing - there is little practical difference between this and the
332 * function below that covers the reading case, but we need the output trace
333 * object to report errors properly so the two functions take slightly
334 * different arguments. This is really lame and there should be a much better
335 * way of doing this.
336 *
337 * NOTE: This function assumes the open_dag_mutex is held by the caller
338 */
339static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
340                                                char *dev_name)
341{
342        struct stat buf;
343        int fd;
344        struct dag_dev_t *new_dev;
345
346        /* Make sure the device exists */
347        if (stat(dev_name, &buf) == -1) {
348                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
349                return NULL;
350        }
351
352        /* Make sure it is the appropriate type of device */
353        if (S_ISCHR(buf.st_mode)) {
354                /* Try opening the DAG device */
355                if((fd = dag_open(dev_name)) < 0) {
356                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
357                                        dev_name);
358                        return NULL;
359                }
360        } else {
361                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
362                                dev_name);
363                return NULL;
364        }
365
366        /* Add the device to our device list - it is just a doubly linked
367         * list with no inherent ordering; just tack the new one on the front
368         */
369        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
370        new_dev->fd = fd;
371        new_dev->dev_name = dev_name;
372        new_dev->ref_count = 1;
373
374        new_dev->prev = NULL;
375        new_dev->next = open_dags;
376        if (open_dags)
377                open_dags->prev = new_dev;
378
379        open_dags = new_dev;
380
381        return new_dev;
382}
383
384/* Opens a new DAG device for reading and adds it to the DAG device list
385 *
386 * NOTE: this function should only be called when opening a DAG device for
387 * reading - there is little practical difference between this and the
388 * function above that covers the writing case, but we need the input trace
389 * object to report errors properly so the two functions take slightly
390 * different arguments. This is really lame and there should be a much better
391 * way of doing this.
392 *
393 * NOTE: This function assumes the open_dag_mutex is held by the caller */
394static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
395        struct stat buf;
396        int fd;
397        struct dag_dev_t *new_dev;
398
399        /* Make sure the device exists */
400        if (stat(dev_name, &buf) == -1) {
401                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
402                return NULL;
403        }
404
405        /* Make sure it is the appropriate type of device */
406        if (S_ISCHR(buf.st_mode)) {
407                /* Try opening the DAG device */
408                if((fd = dag_open(dev_name)) < 0) {
409                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
410                                      dev_name);
411                        return NULL;
412                }
413        } else {
414                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
415                              dev_name);
416                return NULL;
417        }
418
419        /* Add the device to our device list - it is just a doubly linked
420         * list with no inherent ordering; just tack the new one on the front
421         */
422        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
423        new_dev->fd = fd;
424        new_dev->dev_name = dev_name;
425        new_dev->ref_count = 1;
426
427        new_dev->prev = NULL;
428        new_dev->next = open_dags;
429        if (open_dags)
430                open_dags->prev = new_dev;
431
432        open_dags = new_dev;
433
434        return new_dev;
435}
436
437/* Creates and initialises a DAG output trace */
438static int dag_init_output(libtrace_out_t *libtrace)
439{
440        /* Upon successful creation, the device name is stored against the
441         * device and free when it is free()d */
442        char *dag_dev_name = NULL;
443        char *scan = NULL;
444        struct dag_dev_t *dag_device = NULL;
445        int stream = 1;
446
447        /* XXX I don't know if this is important or not, but this function
448         * isn't present in all of the driver releases that this code is
449         * supposed to support! */
450        /*
451        unsigned long wake_time;
452        dagutil_sleep_get_wake_time(&wake_time,0);
453        */
454
455        dag_init_format_out_data(libtrace);
456        /* Grab the mutex while we're likely to be messing with the device
457         * list */
458        pthread_mutex_lock(&open_dag_mutex);
459
460        /* Specific streams are signified using a comma in the libtrace URI,
461         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
462         *
463         * If no stream is specified, we will write using stream 1 */
464        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
465                dag_dev_name = strdup(libtrace->uridata);
466        } else {
467                dag_dev_name = (char *)strndup(libtrace->uridata,
468                                (size_t)(scan - libtrace->uridata));
469                stream = atoi(++scan);
470        }
471        FORMAT_DATA_OUT->dagstream = stream;
472
473        /* See if our DAG device is already open */
474        dag_device = dag_find_open_device(dag_dev_name);
475
476        if (dag_device == NULL) {
477                /* Device not yet opened - open it ourselves */
478                dag_device = dag_open_output_device(libtrace, dag_dev_name);
479        } else {
480                /* Otherwise, just use the existing one */
481                free(dag_dev_name);
482                dag_dev_name = NULL;
483        }
484
485        /* Make sure we have successfully opened a DAG device */
486        if (dag_device == NULL) {
487                if (dag_dev_name) {
488                        free(dag_dev_name);
489                }
490                pthread_mutex_unlock(&open_dag_mutex);
491                return -1;
492        }
493
494        FORMAT_DATA_OUT->device = dag_device;
495        pthread_mutex_unlock(&open_dag_mutex);
496        return 0;
497}
498
499/* Creates and initialises a DAG input trace */
500static int dag_init_input(libtrace_t *libtrace) {
501        /* Upon successful creation, the device name is stored against the
502         * device and free when it is free()d */
503        char *dag_dev_name = NULL;
504        char *scan = NULL;
505        int stream = 0;
506        struct dag_dev_t *dag_device = NULL;
507
508        dag_init_format_data(libtrace);
509        /* Grab the mutex while we're likely to be messing with the device
510         * list */
511        pthread_mutex_lock(&open_dag_mutex);
512
513
514        /* DAG cards support multiple streams. In a single threaded capture,
515         * these are specified using a comma in the libtrace URI,
516         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
517         *
518         * If no stream is specified, we will read from stream 0 with
519         * one thread
520         */
521        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
522                dag_dev_name = strdup(libtrace->uridata);
523        } else {
524                dag_dev_name = (char *)strndup(libtrace->uridata,
525                                (size_t)(scan - libtrace->uridata));
526                stream = atoi(++scan);
527        }
528
529        FORMAT_DATA_FIRST->dagstream = stream;
530
531        /* See if our DAG device is already open */
532        dag_device = dag_find_open_device(dag_dev_name);
533
534        if (dag_device == NULL) {
535                /* Device not yet opened - open it ourselves */
536                dag_device = dag_open_device(libtrace, dag_dev_name);
537        } else {
538                /* Otherwise, just use the existing one */
539                free(dag_dev_name);
540                dag_dev_name = NULL;
541        }
542
543        /* Make sure we have successfully opened a DAG device */
544        if (dag_device == NULL) {
545                if (dag_dev_name)
546                        free(dag_dev_name);
547                dag_dev_name = NULL;
548                pthread_mutex_unlock(&open_dag_mutex);
549                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to open DAG device %s", dag_dev_name);
550                return -1;
551        }
552
553        FORMAT_DATA->device = dag_device;
554
555        /* See Config_Status_API_Programming_Guide.pdf from the Endace
556           Dag Documentation */
557        /* Check kBooleanAttributeActive is true -- no point capturing
558         * on an interface that's disabled
559         *
560         * The symptom of the port being disabled is that libtrace
561         * will appear to hang. */
562        /* Check kBooleanAttributeFault is false */
563        /* Check kBooleanAttributeLocalFault is false */
564        /* Check kBooleanAttributeLock is true ? */
565        /* Check kBooleanAttributePeerLink ? */
566
567        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
568           on libtrace promisc attribute?*/
569        /* Set kUint32AttributeSnapLength to the snaplength */
570
571        pthread_mutex_unlock(&open_dag_mutex);
572        return 0;
573}
574
575#ifdef HAVE_DAG_CONFIG_API_H
576static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
577        dag_card_ref_t card_ref = NULL;
578        dag_component_t root = NULL;
579        attr_uuid_t uuid = 0;
580
581        if (slen < 0)
582                slen = 0; 
583
584        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
585        root = dag_config_get_root_component(card_ref);
586       
587        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
588        dag_config_set_boolean_attribute(card_ref, uuid, true);
589
590        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
591        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
592
593        return 0;
594       
595
596} 
597#endif /* HAVE_DAG_CONFIG_API_H */
598
599/* Configures a DAG input trace */
600static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
601                            void *data)
602{
603        switch(option) {
604        case TRACE_OPTION_META_FREQ:
605                /* This option is used to specify the frequency of DUCK
606                 * updates */
607                DUCK.duck_freq = *(int *)data;
608                return 0;
609        case TRACE_OPTION_SNAPLEN:
610#ifdef HAVE_DAG_CONFIG_API_H
611                return dag_csapi_set_snaplen(libtrace, *(int *)data);
612#else
613                /* Tell the card our new snap length */
614        {
615                char conf_str[4096];
616                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
617                if (dag_configure(FORMAT_DATA->device->fd,
618                                  conf_str) != 0) {
619                        trace_set_err(libtrace, errno, "Failed to configure "
620                                      "snaplen on DAG card: %s",
621                                      libtrace->uridata);
622                        return -1;
623                }
624        }
625#endif /* HAVE_DAG_CONFIG_API_H */
626
627                return 0;
628        case TRACE_OPTION_PROMISC:
629                /* DAG already operates in a promisc fashion */
630                return -1;
631        case TRACE_OPTION_FILTER:
632                /* We don't yet support pushing filters into DAG
633                 * cards */
634                return -1;
635        case TRACE_OPTION_EVENT_REALTIME:
636        case TRACE_OPTION_REPLAY_SPEEDUP:
637                /* Live capture is always going to be realtime */
638                return -1;
639        case TRACE_OPTION_HASHER:
640                /* Lets just say we did this, it's currently still up to
641                 * the user to configure this correctly. */
642                return 0;
643        case TRACE_OPTION_CONSTANT_ERF_FRAMING:
644                return -1;
645        }
646        return -1;
647}
648
649/* Starts a DAG output trace */
650static int dag_start_output(libtrace_out_t *libtrace)
651{
652        struct timeval zero, nopoll;
653
654        zero.tv_sec = 0;
655        zero.tv_usec = 0;
656        nopoll = zero;
657
658        /* Attach and start the DAG stream */
659        if (dag_attach_stream64(FORMAT_DATA_OUT->device->fd,
660                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
661                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
662                return -1;
663        }
664
665        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
666                        FORMAT_DATA_OUT->dagstream) < 0) {
667                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
668                return -1;
669        }
670        FORMAT_DATA_OUT->stream_attached = 1;
671
672        /* We don't want the dag card to do any sleeping */
673        dag_set_stream_poll64(FORMAT_DATA_OUT->device->fd,
674                        FORMAT_DATA_OUT->dagstream, 0, &zero,
675                        &nopoll);
676
677        return 0;
678}
679
680static int dag_start_input_stream(libtrace_t *libtrace,
681                                  struct dag_per_stream_t * stream) {
682        struct timeval zero, nopoll;
683        uint8_t *top, *bottom, *starttop;
684        top = bottom = NULL;
685
686        zero.tv_sec = 0;
687        zero.tv_usec = 10000;
688        nopoll = zero;
689
690        /* Attach and start the DAG stream */
691        if (dag_attach_stream64(FORMAT_DATA->device->fd,
692                              stream->dagstream, 0, 0) < 0) {
693                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
694                              stream->dagstream);
695                return -1;
696        }
697
698        if (dag_start_stream(FORMAT_DATA->device->fd,
699                             stream->dagstream) < 0) {
700                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
701                              stream->dagstream);
702                return -1;
703        }
704        FORMAT_DATA->stream_attached = 1;
705
706        /* We don't want the dag card to do any sleeping */
707        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
708                            stream->dagstream, 0, &zero,
709                            &nopoll) < 0) {
710                trace_set_err(libtrace, errno,
711                              "dag_set_stream_poll failed!");
712                return -1;
713        }
714
715        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
716                                      stream->dagstream,
717                                      &bottom);
718
719        /* Should probably flush the memory hole now */
720        top = starttop;
721        while (starttop - bottom > 0) {
722                bottom += (starttop - bottom);
723                top = dag_advance_stream(FORMAT_DATA->device->fd,
724                                         stream->dagstream,
725                                         &bottom);
726        }
727        stream->top = top;
728        stream->bottom = bottom;
729        stream->processed = 0;
730        stream->drops = 0;
731
732        return 0;
733
734}
735
736/* Starts a DAG input trace */
737static int dag_start_input(libtrace_t *libtrace)
738{
739        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
740}
741
742static int dag_pstart_input(libtrace_t *libtrace)
743{
744        char *scan, *tok;
745        uint16_t stream_count = 0, max_streams;
746        int iserror = 0;
747        struct dag_per_stream_t stream_data;
748
749        /* Check we aren't trying to create more threads than the DAG card can
750         * handle */
751        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
752        if (libtrace->perpkt_thread_count > max_streams) {
753                fprintf(stderr,
754                              "WARNING: DAG has only %u streams available, "
755                              "capping total number of threads at this value.",
756                              max_streams);
757                libtrace->perpkt_thread_count = max_streams;
758        }
759
760        /* Get the stream names from the uri */
761        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
762                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
763                              "Format uri doesn't specify the DAG streams");
764                iserror = 1;
765                goto cleanup;
766        }
767
768        scan++;
769
770        tok = strtok(scan, ",");
771        while (tok != NULL) {
772                /* Ensure we haven't specified too many streams */
773                if (stream_count >= libtrace->perpkt_thread_count) {
774                        fprintf(stderr,
775                                      "WARNING: Format uri specifies too many "
776                                      "streams. Maximum is %u, so only using "
777                                      "the first %u from the uri.",
778                                      libtrace->perpkt_thread_count, 
779                                      libtrace->perpkt_thread_count);
780                        break;
781                }
782
783                /* Save the stream details */
784                if (stream_count == 0) {
785                        /* Special case where we update the existing stream
786                         * data structure */
787                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
788                } else {
789                        memset(&stream_data, 0, sizeof(stream_data));
790                        stream_data.dagstream = (uint16_t)atoi(tok);
791                        libtrace_list_push_back(FORMAT_DATA->per_stream,
792                                                &stream_data);
793                }
794
795                stream_count++;
796                tok = strtok(NULL, ",");
797        }
798
799        if (stream_count < libtrace->perpkt_thread_count) {
800                libtrace->perpkt_thread_count = stream_count;
801        }
802       
803        FORMAT_DATA->stream_attached = 1;
804
805 cleanup:
806        if (iserror) {
807                return -1;
808        } else {
809                return 0;
810        }
811}
812
813/* Pauses a DAG output trace */
814static int dag_pause_output(libtrace_out_t *libtrace)
815{
816        /* Stop and detach the stream */
817        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
818                            FORMAT_DATA_OUT->dagstream) < 0) {
819                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
820                return -1;
821        }
822        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
823                              FORMAT_DATA_OUT->dagstream) < 0) {
824                trace_set_err_out(libtrace, errno,
825                                  "Could not detach DAG stream");
826                return -1;
827        }
828        FORMAT_DATA_OUT->stream_attached = 0;
829        return 0;
830}
831
832/* Pauses a DAG input trace */
833static int dag_pause_input(libtrace_t *libtrace)
834{
835        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
836
837        /* Stop and detach each stream */
838        while (tmp != NULL) {
839                if (dag_stop_stream(FORMAT_DATA->device->fd,
840                                    STREAM_DATA(tmp)->dagstream) < 0) {
841                        trace_set_err(libtrace, errno,
842                                      "Could not stop DAG stream");
843                        return -1;
844                }
845                if (dag_detach_stream(FORMAT_DATA->device->fd,
846                                      STREAM_DATA(tmp)->dagstream) < 0) {
847                        trace_set_err(libtrace, errno,
848                                      "Could not detach DAG stream");
849                        return -1;
850                }
851
852                tmp = tmp->next;
853        }
854
855        FORMAT_DATA->stream_attached = 0;
856        return 0;
857}
858
859
860
861/* Closes a DAG input trace */
862static int dag_fin_input(libtrace_t *libtrace)
863{
864        /* Need the lock, since we're going to be handling the device list */
865        pthread_mutex_lock(&open_dag_mutex);
866
867        /* Detach the stream if we are not paused */
868        if (FORMAT_DATA->stream_attached)
869                dag_pause_input(libtrace);
870        FORMAT_DATA->device->ref_count--;
871
872        /* Close the DAG device if there are no more references to it */
873        if (FORMAT_DATA->device->ref_count == 0)
874                dag_close_device(FORMAT_DATA->device);
875
876        if (DUCK.dummy_duck)
877                trace_destroy_dead(DUCK.dummy_duck);
878
879        /* Clear the list */
880        libtrace_list_deinit(FORMAT_DATA->per_stream);
881        free(libtrace->format_data);
882        pthread_mutex_unlock(&open_dag_mutex);
883        return 0; /* success */
884}
885
886/* Closes a DAG output trace */
887static int dag_fin_output(libtrace_out_t *libtrace)
888{
889
890        /* Commit any outstanding traffic in the txbuffer */
891        if (FORMAT_DATA_OUT->waiting) {
892                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
893                                           FORMAT_DATA_OUT->dagstream,
894                                           FORMAT_DATA_OUT->waiting );
895        }
896
897        /* Wait until the buffer is nearly clear before exiting the program,
898         * as we will lose packets otherwise */
899        dag_tx_get_stream_space64
900                (FORMAT_DATA_OUT->device->fd,
901                 FORMAT_DATA_OUT->dagstream,
902                 dag_get_stream_buffer_size64(FORMAT_DATA_OUT->device->fd,
903                                            FORMAT_DATA_OUT->dagstream) - 8);
904
905        /* Need the lock, since we're going to be handling the device list */
906        pthread_mutex_lock(&open_dag_mutex);
907
908        /* Detach the stream if we are not paused */
909        if (FORMAT_DATA_OUT->stream_attached)
910                dag_pause_output(libtrace);
911        FORMAT_DATA_OUT->device->ref_count --;
912
913        /* Close the DAG device if there are no more references to it */
914        if (FORMAT_DATA_OUT->device->ref_count == 0)
915                dag_close_device(FORMAT_DATA_OUT->device);
916        free(libtrace->format_data);
917        pthread_mutex_unlock(&open_dag_mutex);
918        return 0; /* success */
919}
920
921#ifdef DAGIOC_CARD_DUCK
922#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
923#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
924#else
925#ifdef DAGIOCDUCK
926#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
927#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
928#else
929#warning "DAG appears to be missing DUCK support"
930#endif
931#endif
932
933/* Extracts DUCK information from the DAG card and produces a DUCK packet */
934static int dag_get_duckinfo(libtrace_t *libtrace,
935                                libtrace_packet_t *packet) {
936
937        if (DUCK.duck_freq == 0)
938                return 0;
939
940#ifndef LIBTRACE_DUCK_IOCTL
941        trace_set_err(libtrace, errno, 
942                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
943        DUCK.duck_freq = 0;
944        return -1;
945#endif
946
947        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
948                return 0;
949
950        /* Allocate memory for the DUCK data */
951        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
952            !packet->buffer) {
953                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
954                packet->buf_control = TRACE_CTRL_PACKET;
955                if (!packet->buffer) {
956                        trace_set_err(libtrace, errno,
957                                      "Cannot allocate packet buffer");
958                        return -1;
959                }
960        }
961
962        /* DUCK doesn't have a format header */
963        packet->header = 0;
964        packet->payload = packet->buffer;
965
966        /* No need to check if we can get DUCK or not - we're modern
967         * enough so just grab the DUCK info */
968        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
969                   (duckinf_t *)packet->payload) < 0)) {
970                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
971                DUCK.duck_freq = 0;
972                return -1;
973        }
974
975        packet->type = LIBTRACE_DUCK_VERSION;
976
977        /* Set the packet's trace to point at a DUCK trace, so that the
978         * DUCK format functions will be called on the packet rather than the
979         * DAG ones */
980        if (!DUCK.dummy_duck)
981                DUCK.dummy_duck = trace_create_dead("duck:dummy");
982        packet->trace = DUCK.dummy_duck;
983        DUCK.last_duck = DUCK.last_pkt;
984        packet->error = sizeof(duckinf_t);
985        return sizeof(duckinf_t);
986}
987
988/* Determines the amount of data available to read from the DAG card */
989static int dag_available(libtrace_t *libtrace,
990                         struct dag_per_stream_t *stream_data)
991{
992        uint32_t diff = stream_data->top - stream_data->bottom;
993
994        /* If we've processed more than 4MB of data since we last called
995         * dag_advance_stream, then we should call it again to allow the
996         * space occupied by that 4MB to be released */
997        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
998                return diff;
999
1000        /* Update the top and bottom pointers */
1001        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
1002                                              stream_data->dagstream,
1003                                              &(stream_data->bottom));
1004
1005        if (stream_data->top == NULL) {
1006                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
1007                return -1;
1008        }
1009        stream_data->processed = 0;
1010        diff = stream_data->top - stream_data->bottom;
1011        return diff;
1012}
1013
1014/* Returns a pointer to the start of the next complete ERF record */
1015static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
1016{
1017        dag_record_t *erfptr = NULL;
1018        uint16_t size;
1019
1020        erfptr = (dag_record_t *)stream_data->bottom;
1021        if (!erfptr)
1022                return NULL;
1023
1024        size = ntohs(erfptr->rlen);
1025        if (size < dag_record_size) {
1026                fprintf(stderr, "DAG2.5 rlen is invalid (rlen %u, must be at least %u\n",
1027                        size, dag_record_size);
1028                return NULL;
1029        }
1030
1031        /* Make certain we have the full packet available */
1032        if (size > (stream_data->top - stream_data->bottom))
1033                return NULL;
1034
1035        stream_data->bottom += size;
1036        stream_data->processed += size;
1037        return erfptr;
1038}
1039
1040/* Converts a buffer containing a recently read DAG packet record into a
1041 * libtrace packet */
1042static int dag_prepare_packet_stream(libtrace_t *libtrace,
1043                                     struct dag_per_stream_t *stream_data,
1044                                     libtrace_packet_t *packet,
1045                                     void *buffer, libtrace_rt_types_t rt_type,
1046                                     uint32_t flags)
1047{
1048        dag_record_t *erfptr;
1049
1050        /* If the packet previously owned a buffer that is not the buffer
1051         * that contains the new packet data, we're going to need to free the
1052         * old one to avoid memory leaks */
1053        if (packet->buffer != buffer &&
1054            packet->buf_control == TRACE_CTRL_PACKET) {
1055                free(packet->buffer);
1056        }
1057
1058        /* Set the buffer owner appropriately */
1059        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1060                packet->buf_control = TRACE_CTRL_PACKET;
1061        } else
1062                packet->buf_control = TRACE_CTRL_EXTERNAL;
1063
1064        /* Update the packet pointers and type appropriately */
1065        erfptr = (dag_record_t *)buffer;
1066        packet->buffer = erfptr;
1067        packet->header = erfptr;
1068        packet->type = rt_type;
1069
1070        if (erfptr->flags.rxerror == 1) {
1071                /* rxerror means the payload is corrupt - drop the payload
1072                 * by tweaking rlen */
1073                packet->payload = NULL;
1074                erfptr->rlen = htons(erf_get_framing_length(packet));
1075        } else {
1076                packet->payload = (char*)packet->buffer
1077                        + erf_get_framing_length(packet);
1078        }
1079
1080        if (libtrace->format_data == NULL) {
1081                dag_init_format_data(libtrace);
1082        }
1083
1084        /* Update the dropped packets counter */
1085        /* No loss counter for DSM coloured records - have to use some
1086         * other API */
1087        if (erf_is_color_type(erfptr->type)) {
1088                /* TODO */
1089        } else {
1090                /* Use the ERF loss counter */
1091                if (stream_data->seeninterface[erfptr->flags.iface]
1092                    == 0) {
1093                        stream_data->seeninterface[erfptr->flags.iface]
1094                                = 1;
1095                } else {
1096                        stream_data->drops += ntohs(erfptr->lctr);
1097                }
1098        }
1099
1100        return 0;
1101}
1102
1103static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1104                              void *buffer, libtrace_rt_types_t rt_type,
1105                              uint32_t flags)
1106{
1107        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1108                                       buffer, rt_type, flags);
1109}
1110
1111/*
1112 * dag_write_packet() at this stage attempts to improve tx performance
1113 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1114 * has been met. I observed approximately 270% performance increase
1115 * through this relatively naive tweak. No optimisation of buffer sizes
1116 * was attempted.
1117 */
1118
1119/* Pushes an ERF record onto the transmit stream */
1120static int dag_dump_packet(libtrace_out_t *libtrace,
1121                           dag_record_t *erfptr, unsigned int pad,
1122                           void *buffer)
1123{
1124        int size;
1125
1126        /*
1127         * If we've got 0 bytes waiting in the txqueue, assume that we
1128         * haven't requested any space yet, and request some, storing
1129         * the pointer at FORMAT_DATA_OUT->txbuffer.
1130         *
1131         * The amount to request is slightly magical at the moment - it's
1132         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1133         * the buffer and handle overruns.
1134         */
1135        if (FORMAT_DATA_OUT->waiting == 0) {
1136                FORMAT_DATA_OUT->txbuffer =
1137                        dag_tx_get_stream_space64(FORMAT_DATA_OUT->device->fd,
1138                                                FORMAT_DATA_OUT->dagstream,
1139                                                16908288);
1140        }
1141
1142        /*
1143         * Copy the header separately to the body, as we can't guarantee they
1144         * are in contiguous memory
1145         */
1146        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1147               (dag_record_size + pad));
1148        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1149
1150        /*
1151         * Copy our incoming packet into the outgoing buffer, and increment
1152         * our waiting count
1153         */
1154        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1155        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1156               size);
1157        FORMAT_DATA_OUT->waiting += size;
1158
1159        /*
1160         * If our output buffer has more than 16 Mebibytes in it, commit those
1161         * bytes and reset the waiting count to 0.
1162         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1163         * case there is still data in the buffer at program exit.
1164         */
1165        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1166                FORMAT_DATA_OUT->txbuffer =
1167                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1168                                                   FORMAT_DATA_OUT->dagstream,
1169                                                   FORMAT_DATA_OUT->waiting);
1170                FORMAT_DATA_OUT->waiting = 0;
1171        }
1172
1173        return size + pad + dag_record_size;
1174}
1175
1176/* Attempts to determine a suitable ERF type for a given packet. Returns true
1177 * if one is found, false otherwise */
1178static bool find_compatible_linktype(libtrace_out_t *libtrace,
1179                                     libtrace_packet_t *packet, char *type)
1180{
1181        /* Keep trying to simplify the packet until we can find
1182         * something we can do with it */
1183
1184        do {
1185                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1186
1187                /* Success */
1188                if (*type != (char)-1)
1189                        return true;
1190
1191                if (!demote_packet(packet)) {
1192                        trace_set_err_out(libtrace,
1193                                          TRACE_ERR_NO_CONVERSION,
1194                                          "No erf type for packet (%i)",
1195                                          trace_get_link_type(packet));
1196                        return false;
1197                }
1198
1199        } while(1);
1200
1201        return true;
1202}
1203
1204/* Writes a packet to the provided DAG output trace */
1205static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1206{
1207        /* Check dag can write this type of packet */
1208        if (!dag_can_write(packet)) {
1209                return 0;
1210        }
1211
1212        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1213         * coding sucks, sorry about that.
1214         */
1215        unsigned int pad = 0;
1216        int numbytes;
1217        void *payload = packet->payload;
1218        dag_record_t *header = (dag_record_t *)packet->header;
1219        char erf_type = 0;
1220
1221        if(!packet->header) {
1222                /* No header, probably an RT packet. Lifted from
1223                 * erf_write_packet(). */
1224                return -1;
1225        }
1226
1227        pad = dag_get_padding(packet);
1228
1229        /*
1230         * If the payload is null, adjust the rlen. Discussion of this is
1231         * attached to erf_write_packet()
1232         */
1233        if (payload == NULL) {
1234                header->rlen = htons(dag_record_size + pad);
1235        }
1236
1237        if (packet->type == TRACE_RT_DATA_ERF) {
1238                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1239        } else {
1240                /* Build up a new packet header from the existing header */
1241
1242                /* Simplify the packet first - if we can't do this, break
1243                 * early */
1244                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1245                        return -1;
1246
1247                dag_record_t erfhdr;
1248
1249                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1250                payload=packet->payload;
1251                pad = dag_get_padding(packet);
1252
1253                /* Flags. Can't do this */
1254                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1255                if (trace_get_direction(packet)!=(int)~0U)
1256                        erfhdr.flags.iface = trace_get_direction(packet);
1257
1258                erfhdr.type = erf_type;
1259
1260                /* Packet length (rlen includes format overhead) */
1261                if (trace_get_capture_length(packet) <= 0
1262                       || trace_get_capture_length(packet) > 65536) {
1263                        trace_set_err_out(libtrace, TRACE_ERR_BAD_PACKET,
1264                                "Capture length is out of range in dag_write_packet()");
1265                        return -1;
1266                }
1267                if (erf_get_framing_length(packet) <= 0
1268                       || trace_get_framing_length(packet) > 65536) {
1269                        trace_set_err_out(libtrace, TRACE_ERR_BAD_PACKET,
1270                                "Framing length is out of range in dag_write_packet()");
1271                        return -1;
1272                }
1273                if (trace_get_capture_length(packet) +
1274                       erf_get_framing_length(packet) <= 0
1275                       || trace_get_capture_length(packet) +
1276                       erf_get_framing_length(packet) > 65536) {
1277                        trace_set_err_out(libtrace, TRACE_ERR_BAD_PACKET,
1278                                "Capture + framing length is out of range in dag_write_packet()");
1279                        return -1;
1280                }
1281
1282                erfhdr.rlen = htons(trace_get_capture_length(packet)
1283                                    + erf_get_framing_length(packet));
1284
1285
1286                /* Loss counter. Can't do this */
1287                erfhdr.lctr = 0;
1288                /* Wire length, does not include padding! */
1289                erfhdr.wlen = htons(trace_get_wire_length(packet));
1290
1291                /* Write it out */
1292                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1293        }
1294
1295        return numbytes;
1296}
1297
1298/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1299 *
1300 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1301 */
1302static int dag_read_packet_stream(libtrace_t *libtrace,
1303                                struct dag_per_stream_t *stream_data,
1304                                libtrace_thread_t *t, /* Optional */
1305                                libtrace_packet_t *packet)
1306{
1307        int size = 0;
1308        dag_record_t *erfptr = NULL;
1309        struct timeval tv;
1310        int numbytes = 0;
1311        uint32_t flags = 0;
1312        struct timeval maxwait, pollwait;
1313
1314        pollwait.tv_sec = 0;
1315        pollwait.tv_usec = 10000;
1316        maxwait.tv_sec = 0;
1317        maxwait.tv_usec = 250000;
1318
1319        /* Check if we're due for a DUCK report - only report on the first thread */
1320        if (stream_data == FORMAT_DATA_FIRST) {
1321                size = dag_get_duckinfo(libtrace, packet);
1322                if (size != 0)
1323                        return size;
1324        }
1325
1326
1327        /* Don't let anyone try to free our DAG memory hole! */
1328        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1329
1330        /* If the packet buffer is currently owned by libtrace, free it so
1331         * that we can set the packet to point into the DAG memory hole */
1332        if (packet->buf_control == TRACE_CTRL_PACKET) {
1333                free(packet->buffer);
1334                packet->buffer = 0;
1335        }
1336
1337        if (dag_set_stream_poll64(FORMAT_DATA->device->fd, stream_data->dagstream,
1338                                sizeof(dag_record_t), &maxwait,
1339                                &pollwait) == -1) {
1340                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1341                return -1;
1342        }
1343
1344        /* Grab a full ERF record */
1345        do {
1346                numbytes = dag_available(libtrace, stream_data);
1347                if (numbytes < 0)
1348                        return numbytes;
1349                if (numbytes < dag_record_size) {
1350                        /* Check the message queue if we have one to check */
1351                        if (t != NULL &&
1352                            libtrace_message_queue_count(&t->messages) > 0)
1353                                return -2;
1354
1355                        if ((numbytes=is_halted(libtrace)) != -1)
1356                                return numbytes;
1357                        /* Block until we see a packet */
1358                        continue;
1359                }
1360                erfptr = dag_get_record(stream_data);
1361        } while (erfptr == NULL);
1362
1363        packet->trace = libtrace;
1364
1365        /* Prepare the libtrace packet */
1366        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1367                                    TRACE_RT_DATA_ERF, flags))
1368                return -1;
1369
1370        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1371        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1372                tv = trace_get_timeval(packet);
1373                DUCK.last_pkt = tv.tv_sec;
1374        }
1375
1376        packet->order = erf_get_erf_timestamp(packet);
1377        packet->error = packet->payload ? htons(erfptr->rlen) :
1378                                          erf_get_framing_length(packet);
1379
1380        return packet->error;
1381}
1382
1383static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1384{
1385        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1386}
1387
1388static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1389                             libtrace_packet_t **packets, size_t nb_packets)
1390{
1391        int ret;
1392        size_t read_packets = 0;
1393        int numbytes = 0;
1394
1395        struct dag_per_stream_t *stream_data =
1396                (struct dag_per_stream_t *)t->format_data;
1397
1398        /* Read as many packets as we can, but read atleast one packet */
1399        do {
1400                ret = dag_read_packet_stream(libtrace, stream_data, t,
1401                                           packets[read_packets]);
1402                if (ret < 0)
1403                        return ret;
1404
1405                read_packets++;
1406
1407                /* Make sure we don't read too many packets..! */
1408                if (read_packets >= nb_packets)
1409                        break;
1410
1411                numbytes = dag_available(libtrace, stream_data);
1412        } while (numbytes >= dag_record_size);
1413
1414        return read_packets;
1415}
1416
1417/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1418 * packet is available, we will return a packet event. Otherwise we will
1419 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1420 */
1421static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1422                                           libtrace_packet_t *packet)
1423{
1424        libtrace_eventobj_t event = {0,0,0.0,0};
1425        dag_record_t *erfptr = NULL;
1426        int numbytes;
1427        uint32_t flags = 0;
1428        struct timeval minwait, tv;
1429       
1430        minwait.tv_sec = 0;
1431        minwait.tv_usec = 10000;
1432
1433        /* Check if we're meant to provide a DUCK update */
1434        numbytes = dag_get_duckinfo(libtrace, packet);
1435        if (numbytes < 0) {
1436                event.type = TRACE_EVENT_TERMINATE;
1437                return event;
1438        } else if (numbytes > 0) {
1439                event.type = TRACE_EVENT_PACKET;
1440                return event;
1441        }
1442       
1443        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
1444                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1445                                &minwait) == -1) {
1446                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1447                event.type = TRACE_EVENT_TERMINATE;
1448                return event;
1449        }
1450
1451        do {
1452                erfptr = NULL;
1453                numbytes = 0;
1454
1455                /* Need to call dag_available so that the top pointer will get
1456                 * updated, otherwise we'll never see any data! */
1457                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1458
1459                /* May as well not bother calling dag_get_record if
1460                 * dag_available suggests that there's no data */
1461                if (numbytes != 0)
1462                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1463                if (erfptr == NULL) {
1464                        /* No packet available - sleep for a very short time */
1465                        if (libtrace_halt) {
1466                                event.type = TRACE_EVENT_TERMINATE;
1467                        } else {
1468                                event.type = TRACE_EVENT_SLEEP;
1469                                event.seconds = 0.0001;
1470                        }
1471                        break;
1472                }
1473                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1474                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1475                        event.type = TRACE_EVENT_TERMINATE;
1476                        break;
1477                }
1478
1479
1480                event.size = trace_get_capture_length(packet) +
1481                        trace_get_framing_length(packet);
1482
1483                /* XXX trace_read_packet() normally applies the following
1484                 * config options for us, but this function is called via
1485                 * trace_event() so we have to do it ourselves */
1486
1487                if (libtrace->filter) {
1488                        int filtret = trace_apply_filter(libtrace->filter,
1489                                                         packet);
1490                        if (filtret == -1) {
1491                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1492                                              "Bad BPF Filter");
1493                                event.type = TRACE_EVENT_TERMINATE;
1494                                break;
1495                        }
1496
1497                        if (filtret == 0) {
1498                                /* This packet isn't useful so we want to
1499                                 * immediately see if there is another suitable
1500                                 * one - we definitely DO NOT want to return
1501                                 * a sleep event in this case, like we used to
1502                                 * do! */
1503                                libtrace->filtered_packets ++;
1504                                trace_clear_cache(packet);
1505                                continue;
1506                        }
1507
1508                        event.type = TRACE_EVENT_PACKET;
1509                } else {
1510                        event.type = TRACE_EVENT_PACKET;
1511                }
1512
1513                /* Update the DUCK timer */
1514                tv = trace_get_timeval(packet);
1515                DUCK.last_pkt = tv.tv_sec;
1516               
1517                if (libtrace->snaplen > 0) {
1518                        trace_set_capture_length(packet, libtrace->snaplen);
1519                }
1520                libtrace->accepted_packets ++;
1521                break;
1522        } while(1);
1523
1524        return event;
1525}
1526
1527static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1528{
1529        libtrace_list_node_t *tmp;
1530        if (!libtrace) {
1531                fprintf(stderr, "NULL trace passed into dag_get_statistics()\n");
1532                return;
1533        }
1534        if (!stat) {
1535                trace_set_err(libtrace, TRACE_ERR_STAT, "NULL stat passed into dag_get_statistics()");
1536                return;
1537        }
1538        tmp = FORMAT_DATA_HEAD;
1539
1540        /* Dropped packets */
1541        stat->dropped_valid = 1;
1542        stat->dropped = 0;
1543        while (tmp != NULL) {
1544                stat->dropped += STREAM_DATA(tmp)->drops;
1545                tmp = tmp->next;
1546        }
1547
1548}
1549
1550static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1551                                       libtrace_stat_t *stat) {
1552        struct dag_per_stream_t *stream_data = t->format_data;
1553        if (!libtrace) {
1554                fprintf(stderr, "NULL trace passed into dag_get_thread_statistics()\n");
1555                return;
1556        }
1557        if (!stat) {
1558                trace_set_err(libtrace, TRACE_ERR_STAT, "NULL stat passed into dag_get_thread_statistics()");
1559                return;
1560        }
1561        stat->dropped_valid = 1;
1562        stat->dropped = stream_data->drops;
1563
1564        stat->filtered_valid = 1;
1565        stat->filtered = 0;
1566}
1567
1568/* Prints some semi-useful help text about the DAG format module */
1569static void dag_help(void) {
1570        printf("dag format module: $Revision: 1755 $\n");
1571        printf("Supported input URIs:\n");
1572        printf("\tdag:/dev/dagn\n");
1573        printf("\n");
1574        printf("\te.g.: dag:/dev/dag0\n");
1575        printf("\n");
1576        printf("Supported output URIs:\n");
1577        printf("\tnone\n");
1578        printf("\n");
1579}
1580
1581static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1582                                bool reader)
1583{
1584        struct dag_per_stream_t *stream_data;
1585        libtrace_list_node_t *node;
1586
1587        if (reader && t->type == THREAD_PERPKT) {
1588                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1589                                                t->perpkt_num);
1590                if (node == NULL) {
1591                        return -1;
1592                }
1593                stream_data = node->data;
1594
1595                /* Pass the per thread data to the thread */
1596                t->format_data = stream_data;
1597
1598                /* Attach and start the DAG stream */
1599                if (dag_start_input_stream(libtrace, stream_data) < 0)
1600                        return -1;
1601        }
1602
1603        return 0;
1604}
1605
1606static struct libtrace_format_t dag = {
1607        "dag",
1608        "$Id$",
1609        TRACE_FORMAT_ERF,
1610        dag_probe_filename,             /* probe filename */
1611        NULL,                           /* probe magic */
1612        dag_init_input,                 /* init_input */
1613        dag_config_input,               /* config_input */
1614        dag_start_input,                /* start_input */
1615        dag_pause_input,                /* pause_input */
1616        dag_init_output,                /* init_output */
1617        NULL,                           /* config_output */
1618        dag_start_output,               /* start_output */
1619        dag_fin_input,                  /* fin_input */
1620        dag_fin_output,                 /* fin_output */
1621        dag_read_packet,                /* read_packet */
1622        dag_prepare_packet,             /* prepare_packet */
1623        NULL,                           /* fin_packet */
1624        dag_write_packet,               /* write_packet */
1625        NULL,                           /* flush_output */
1626        erf_get_link_type,              /* get_link_type */
1627        erf_get_direction,              /* get_direction */
1628        erf_set_direction,              /* set_direction */
1629        erf_get_erf_timestamp,          /* get_erf_timestamp */
1630        NULL,                           /* get_timeval */
1631        NULL,                           /* get_seconds */
1632        NULL,                           /* get_timespec */
1633        NULL,                           /* seek_erf */
1634        NULL,                           /* seek_timeval */
1635        NULL,                           /* seek_seconds */
1636        erf_get_capture_length,         /* get_capture_length */
1637        erf_get_wire_length,            /* get_wire_length */
1638        erf_get_framing_length,         /* get_framing_length */
1639        erf_set_capture_length,         /* set_capture_length */
1640        NULL,                           /* get_received_packets */
1641        NULL,                           /* get_filtered_packets */
1642        NULL,                           /* get_dropped_packets */
1643        dag_get_statistics,             /* get_statistics */
1644        NULL,                           /* get_fd */
1645        trace_event_dag,                /* trace_event */
1646        dag_help,                       /* help */
1647        NULL,                            /* next pointer */
1648        {true, 0}, /* live packet capture, thread limit TBD */
1649        dag_pstart_input,
1650        dag_pread_packets,
1651        dag_pause_input,
1652        NULL,
1653        dag_pregister_thread,
1654        NULL,
1655        dag_get_thread_statistics       /* get thread stats */
1656};
1657
1658void dag_constructor(void)
1659{
1660        register_format(&dag);
1661}
Note: See TracBrowser for help on using the repository browser.