source: lib/format_dag25.c @ 9a6bdbc

develop
Last change on this file since 9a6bdbc was 9a6bdbc, checked in by Jacob Van Walraven <jcv9@…>, 22 months ago

Added can_write functions to each output format, Fixed pcapng_get_header_type incorrectly flipping type bytes

  • Property mode set to 100644
File size: 47.1 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#define _GNU_SOURCE
27
28#include "config.h"
29#include "common.h"
30#include "libtrace.h"
31#include "libtrace_int.h"
32#include "format_helper.h"
33#include "format_erf.h"
34
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <string.h>
39#include <stdlib.h>
40#include <sys/stat.h>
41
42#include <sys/mman.h>
43/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
44 * Windows anyway so we'll worry about this more later :] */
45#include <pthread.h>
46
47
48#ifdef HAVE_DAG_CONFIG_API_H
49#include <dag_config_api.h>
50#endif
51
52#ifdef WIN32
53#  include <io.h>
54#  include <share.h>
55#  define PATH_MAX _MAX_PATH
56#  define snprintf sprintf_s
57#else
58#  include <netdb.h>
59#  ifndef PATH_MAX
60#       define PATH_MAX 4096
61#  endif
62#  include <sys/ioctl.h>
63#endif
64
65/* This format deals with DAG cards that are using drivers from the 2.5 version
66 * onwards, including 3.X.
67 *
68 * DAG is a LIVE capture format.
69 *
70 * This format does support writing, provided the DAG card that you are using
71 * has transmit (Tx) support. Additionally, packets read using this format
72 * are in the ERF format, so can easily be written as ERF traces without
73 * losing any data.
74 */
75
76#define DATA(x) ((struct dag_format_data_t *)x->format_data)
77#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
78#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
79
80#define FORMAT_DATA DATA(libtrace)
81#define FORMAT_DATA_OUT DATA_OUT(libtrace)
82
83#define DUCK FORMAT_DATA->duck
84
85#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
86#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
87
88static struct libtrace_format_t dag;
89
90/* A DAG device - a DAG device can support multiple streams (and therefore
91 * multiple input traces) so each trace needs to refer to a device */
92struct dag_dev_t {
93        char * dev_name;                /* Device name */
94        int fd;                         /* File descriptor */
95        uint16_t ref_count;             /* Number of input / output traces
96                                           that are using this device */
97        struct dag_dev_t *prev;         /* Pointer to the previous device in
98                                           the device list */
99        struct dag_dev_t *next;         /* Pointer to the next device in the
100                                           device list */
101};
102
103/* "Global" data that is stored for each DAG output trace */
104struct dag_format_data_out_t {
105        /* The DAG device being used for writing */
106        struct dag_dev_t *device;
107        /* The DAG stream that is being written on */
108        unsigned int dagstream;
109        /* Boolean flag indicating whether the stream is currently attached */
110        int stream_attached;
111        /* The amount of data waiting to be transmitted, in bytes */
112        uint64_t waiting;
113        /* A buffer to hold the data to be transmittted */
114        uint8_t *txbuffer;
115};
116
117/* Data that is stored against each input stream */
118struct dag_per_stream_t {
119        /* DAG stream number */
120        uint16_t dagstream;
121        /* Pointer to the last unread byte in the DAG memory */
122        uint8_t *top;
123        /* Pointer to the first unread byte in the DAG memory */
124        uint8_t *bottom;
125        /* Amount of data processed from the bottom pointer */
126        uint32_t processed;
127        /* Number of packets seen by the stream */
128        uint64_t pkt_count;
129        /* Drop count for this particular stream */
130        uint64_t drops;
131        /* Boolean values to indicate if a particular interface has been seen
132         * or not. This is limited to four interfaces, which is enough to
133         * support all current DAG cards */
134        uint8_t seeninterface[4];
135};
136
137/* "Global" data that is stored for each DAG input trace */
138struct dag_format_data_t {
139        /* DAG device */
140        struct dag_dev_t *device;
141        /* Boolean flag indicating whether the trace is currently attached */
142        int stream_attached;
143        /* Data stored against each DAG input stream */
144        libtrace_list_t *per_stream;
145
146        /* Data required for regular DUCK reporting.
147         * We put this on a new cache line otherwise we have a lot of false
148         * sharing caused by updating the last_pkt.
149         * This should only ever be accessed by the first thread stream,
150         * that includes both read and write operations.
151         */
152        struct {
153                /* Timestamp of the last DUCK report */
154                uint32_t last_duck;
155                /* The number of seconds between each DUCK report */
156                uint32_t duck_freq;
157                /* Timestamp of the last packet read from the DAG card */
158                uint32_t last_pkt;
159                /* Dummy trace to ensure DUCK packets are dealt with using the
160                 * DUCK format functions */
161                libtrace_t *dummy_duck;
162        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
163};
164
165/* To be thread-safe, we're going to need a mutex for operating on the list
166 * of DAG devices */
167pthread_mutex_t open_dag_mutex;
168
169/* The list of DAG devices that have been opened by libtrace.
170 *
171 * We can only open each DAG device once, but we might want to read from
172 * multiple streams. Therefore, we need to maintain a list of devices that we
173 * have opened (with ref counts!) so that we don't try to open a device too
174 * many times or close a device that we're still using */
175struct dag_dev_t *open_dags = NULL;
176
177static bool dag_can_write(libtrace_packet_t *packet) {
178        /* Get the linktype */
179        libtrace_linktype_t ltype = trace_get_link_type(packet);
180
181        if (ltype == TRACE_TYPE_ERF_META
182                || ltype == TRACE_TYPE_NONDATA) {
183
184                return false;
185        }
186
187        return true;
188}
189
190/* Returns the amount of padding between the ERF header and the start of the
191 * captured packet data */
192static int dag_get_padding(const libtrace_packet_t *packet)
193{
194        /* ERF Ethernet records have a 2 byte padding before the packet itself
195         * so that the IP header is aligned on a 32 bit boundary.
196         */
197        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
198                dag_record_t *erfptr = (dag_record_t *)packet->header;
199                switch(erfptr->type) {
200                        case TYPE_ETH:
201                        case TYPE_COLOR_ETH:
202                        case TYPE_DSM_COLOR_ETH:
203                        case TYPE_COLOR_HASH_ETH:
204                                return 2;
205                        default:                return 0;
206                }
207        }
208        else {
209                switch(trace_get_link_type(packet)) {
210                        case TRACE_TYPE_ETH:    return 2;
211                        default:                return 0;
212                }
213        }
214}
215
216/* Attempts to determine if the given filename refers to a DAG device */
217static int dag_probe_filename(const char *filename)
218{
219        struct stat statbuf;
220        /* Can we stat the file? */
221        if (stat(filename, &statbuf) != 0) {
222                return 0;
223        }
224        /* Is it a character device? */
225        if (!S_ISCHR(statbuf.st_mode)) {
226                return 0;
227        }
228        /* Yeah, it's probably us. */
229        return 1;
230}
231
232/* Initialises the DAG output data structure */
233static void dag_init_format_out_data(libtrace_out_t *libtrace)
234{
235        libtrace->format_data = (struct dag_format_data_out_t *)
236                malloc(sizeof(struct dag_format_data_out_t));
237        // no DUCK on output
238        FORMAT_DATA_OUT->stream_attached = 0;
239        FORMAT_DATA_OUT->device = NULL;
240        FORMAT_DATA_OUT->dagstream = 0;
241        FORMAT_DATA_OUT->waiting = 0;
242
243}
244
245/* Initialises the DAG input data structure */
246static void dag_init_format_data(libtrace_t *libtrace)
247{
248        struct dag_per_stream_t stream_data;
249
250        libtrace->format_data = (struct dag_format_data_t *)
251                malloc(sizeof(struct dag_format_data_t));
252        DUCK.last_duck = 0;
253        DUCK.duck_freq = 0;
254        DUCK.last_pkt = 0;
255        DUCK.dummy_duck = NULL;
256
257        FORMAT_DATA->per_stream =
258                libtrace_list_init(sizeof(stream_data));
259        if (FORMAT_DATA->per_stream == NULL) {
260                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
261                        "Unable to create list for stream in dag_init_format_data()");
262                return;
263        }
264
265        /* We'll start with just one instance of stream_data, and we'll
266         * add more later if we need them */
267        memset(&stream_data, 0, sizeof(stream_data));
268        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
269}
270
271/* Determines if there is already an entry for the given DAG device in the
272 * device list and increments the reference count for that device, if found.
273 *
274 * NOTE: This function assumes the open_dag_mutex is held by the caller */
275static struct dag_dev_t *dag_find_open_device(char *dev_name)
276{
277        struct dag_dev_t *dag_dev;
278
279        dag_dev = open_dags;
280
281        /* XXX: Not exactly zippy, but how often are we going to be dealing
282         * with multiple dag cards? */
283        while (dag_dev != NULL) {
284                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
285                        dag_dev->ref_count ++;
286                        return dag_dev;
287                }
288                dag_dev = dag_dev->next;
289        }
290        return NULL;
291}
292
293/* Closes a DAG device and removes it from the device list.
294 *
295 * Attempting to close a DAG device that has a non-zero reference count will
296 * cause an assertion failure!
297 *
298 * NOTE: This function assumes the open_dag_mutex is held by the caller */
299static void dag_close_device(struct dag_dev_t *dev)
300{
301        /* Need to remove from the device list */
302        if (dev->ref_count != 0) {
303                fprintf(stderr, "Cannot close DAG device with non zero reference in dag_close_device()\n");
304                return;
305        }
306
307        if (dev->prev == NULL) {
308                open_dags = dev->next;
309                if (dev->next)
310                        dev->next->prev = NULL;
311        } else {
312                dev->prev->next = dev->next;
313                if (dev->next)
314                        dev->next->prev = dev->prev;
315        }
316
317        dag_close(dev->fd);
318        free(dev);
319}
320
321
322/* Opens a new DAG device for writing and adds it to the DAG device list
323 *
324 * NOTE: this function should only be called when opening a DAG device for
325 * writing - there is little practical difference between this and the
326 * function below that covers the reading case, but we need the output trace
327 * object to report errors properly so the two functions take slightly
328 * different arguments. This is really lame and there should be a much better
329 * way of doing this.
330 *
331 * NOTE: This function assumes the open_dag_mutex is held by the caller
332 */
333static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
334                                                char *dev_name)
335{
336        struct stat buf;
337        int fd;
338        struct dag_dev_t *new_dev;
339
340        /* Make sure the device exists */
341        if (stat(dev_name, &buf) == -1) {
342                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
343                return NULL;
344        }
345
346        /* Make sure it is the appropriate type of device */
347        if (S_ISCHR(buf.st_mode)) {
348                /* Try opening the DAG device */
349                if((fd = dag_open(dev_name)) < 0) {
350                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
351                                        dev_name);
352                        return NULL;
353                }
354        } else {
355                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
356                                dev_name);
357                return NULL;
358        }
359
360        /* Add the device to our device list - it is just a doubly linked
361         * list with no inherent ordering; just tack the new one on the front
362         */
363        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
364        new_dev->fd = fd;
365        new_dev->dev_name = dev_name;
366        new_dev->ref_count = 1;
367
368        new_dev->prev = NULL;
369        new_dev->next = open_dags;
370        if (open_dags)
371                open_dags->prev = new_dev;
372
373        open_dags = new_dev;
374
375        return new_dev;
376}
377
378/* Opens a new DAG device for reading and adds it to the DAG device list
379 *
380 * NOTE: this function should only be called when opening a DAG device for
381 * reading - there is little practical difference between this and the
382 * function above that covers the writing case, but we need the input trace
383 * object to report errors properly so the two functions take slightly
384 * different arguments. This is really lame and there should be a much better
385 * way of doing this.
386 *
387 * NOTE: This function assumes the open_dag_mutex is held by the caller */
388static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
389        struct stat buf;
390        int fd;
391        struct dag_dev_t *new_dev;
392
393        /* Make sure the device exists */
394        if (stat(dev_name, &buf) == -1) {
395                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
396                return NULL;
397        }
398
399        /* Make sure it is the appropriate type of device */
400        if (S_ISCHR(buf.st_mode)) {
401                /* Try opening the DAG device */
402                if((fd = dag_open(dev_name)) < 0) {
403                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
404                                      dev_name);
405                        return NULL;
406                }
407        } else {
408                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
409                              dev_name);
410                return NULL;
411        }
412
413        /* Add the device to our device list - it is just a doubly linked
414         * list with no inherent ordering; just tack the new one on the front
415         */
416        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
417        new_dev->fd = fd;
418        new_dev->dev_name = dev_name;
419        new_dev->ref_count = 1;
420
421        new_dev->prev = NULL;
422        new_dev->next = open_dags;
423        if (open_dags)
424                open_dags->prev = new_dev;
425
426        open_dags = new_dev;
427
428        return new_dev;
429}
430
431/* Creates and initialises a DAG output trace */
432static int dag_init_output(libtrace_out_t *libtrace)
433{
434        /* Upon successful creation, the device name is stored against the
435         * device and free when it is free()d */
436        char *dag_dev_name = NULL;
437        char *scan = NULL;
438        struct dag_dev_t *dag_device = NULL;
439        int stream = 1;
440
441        /* XXX I don't know if this is important or not, but this function
442         * isn't present in all of the driver releases that this code is
443         * supposed to support! */
444        /*
445        unsigned long wake_time;
446        dagutil_sleep_get_wake_time(&wake_time,0);
447        */
448
449        dag_init_format_out_data(libtrace);
450        /* Grab the mutex while we're likely to be messing with the device
451         * list */
452        pthread_mutex_lock(&open_dag_mutex);
453
454        /* Specific streams are signified using a comma in the libtrace URI,
455         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
456         *
457         * If no stream is specified, we will write using stream 1 */
458        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
459                dag_dev_name = strdup(libtrace->uridata);
460        } else {
461                dag_dev_name = (char *)strndup(libtrace->uridata,
462                                (size_t)(scan - libtrace->uridata));
463                stream = atoi(++scan);
464        }
465        FORMAT_DATA_OUT->dagstream = stream;
466
467        /* See if our DAG device is already open */
468        dag_device = dag_find_open_device(dag_dev_name);
469
470        if (dag_device == NULL) {
471                /* Device not yet opened - open it ourselves */
472                dag_device = dag_open_output_device(libtrace, dag_dev_name);
473        } else {
474                /* Otherwise, just use the existing one */
475                free(dag_dev_name);
476                dag_dev_name = NULL;
477        }
478
479        /* Make sure we have successfully opened a DAG device */
480        if (dag_device == NULL) {
481                if (dag_dev_name) {
482                        free(dag_dev_name);
483                }
484                pthread_mutex_unlock(&open_dag_mutex);
485                return -1;
486        }
487
488        FORMAT_DATA_OUT->device = dag_device;
489        pthread_mutex_unlock(&open_dag_mutex);
490        return 0;
491}
492
493/* Creates and initialises a DAG input trace */
494static int dag_init_input(libtrace_t *libtrace) {
495        /* Upon successful creation, the device name is stored against the
496         * device and free when it is free()d */
497        char *dag_dev_name = NULL;
498        char *scan = NULL;
499        int stream = 0;
500        struct dag_dev_t *dag_device = NULL;
501
502        dag_init_format_data(libtrace);
503        /* Grab the mutex while we're likely to be messing with the device
504         * list */
505        pthread_mutex_lock(&open_dag_mutex);
506
507
508        /* DAG cards support multiple streams. In a single threaded capture,
509         * these are specified using a comma in the libtrace URI,
510         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
511         *
512         * If no stream is specified, we will read from stream 0 with
513         * one thread
514         */
515        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
516                dag_dev_name = strdup(libtrace->uridata);
517        } else {
518                dag_dev_name = (char *)strndup(libtrace->uridata,
519                                (size_t)(scan - libtrace->uridata));
520                stream = atoi(++scan);
521        }
522
523        FORMAT_DATA_FIRST->dagstream = stream;
524
525        /* See if our DAG device is already open */
526        dag_device = dag_find_open_device(dag_dev_name);
527
528        if (dag_device == NULL) {
529                /* Device not yet opened - open it ourselves */
530                dag_device = dag_open_device(libtrace, dag_dev_name);
531        } else {
532                /* Otherwise, just use the existing one */
533                free(dag_dev_name);
534                dag_dev_name = NULL;
535        }
536
537        /* Make sure we have successfully opened a DAG device */
538        if (dag_device == NULL) {
539                if (dag_dev_name)
540                        free(dag_dev_name);
541                dag_dev_name = NULL;
542                pthread_mutex_unlock(&open_dag_mutex);
543                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to open DAG device %s", dag_dev_name);
544                return -1;
545        }
546
547        FORMAT_DATA->device = dag_device;
548
549        /* See Config_Status_API_Programming_Guide.pdf from the Endace
550           Dag Documentation */
551        /* Check kBooleanAttributeActive is true -- no point capturing
552         * on an interface that's disabled
553         *
554         * The symptom of the port being disabled is that libtrace
555         * will appear to hang. */
556        /* Check kBooleanAttributeFault is false */
557        /* Check kBooleanAttributeLocalFault is false */
558        /* Check kBooleanAttributeLock is true ? */
559        /* Check kBooleanAttributePeerLink ? */
560
561        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
562           on libtrace promisc attribute?*/
563        /* Set kUint32AttributeSnapLength to the snaplength */
564
565        pthread_mutex_unlock(&open_dag_mutex);
566        return 0;
567}
568
569#ifdef HAVE_DAG_CONFIG_API_H
570static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
571        dag_card_ref_t card_ref = NULL;
572        dag_component_t root = NULL;
573        attr_uuid_t uuid = 0;
574
575        if (slen < 0)
576                slen = 0; 
577
578        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
579        root = dag_config_get_root_component(card_ref);
580       
581        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
582        dag_config_set_boolean_attribute(card_ref, uuid, true);
583
584        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
585        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
586
587        return 0;
588       
589
590} 
591#endif /* HAVE_DAG_CONFIG_API_H */
592
593/* Configures a DAG input trace */
594static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
595                            void *data)
596{
597        switch(option) {
598        case TRACE_OPTION_META_FREQ:
599                /* This option is used to specify the frequency of DUCK
600                 * updates */
601                DUCK.duck_freq = *(int *)data;
602                return 0;
603        case TRACE_OPTION_SNAPLEN:
604#ifdef HAVE_DAG_CONFIG_API_H
605                return dag_csapi_set_snaplen(libtrace, *(int *)data);
606#else
607                /* Tell the card our new snap length */
608        {
609                char conf_str[4096];
610                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
611                if (dag_configure(FORMAT_DATA->device->fd,
612                                  conf_str) != 0) {
613                        trace_set_err(libtrace, errno, "Failed to configure "
614                                      "snaplen on DAG card: %s",
615                                      libtrace->uridata);
616                        return -1;
617                }
618        }
619#endif /* HAVE_DAG_CONFIG_API_H */
620
621                return 0;
622        case TRACE_OPTION_PROMISC:
623                /* DAG already operates in a promisc fashion */
624                return -1;
625        case TRACE_OPTION_FILTER:
626                /* We don't yet support pushing filters into DAG
627                 * cards */
628                return -1;
629        case TRACE_OPTION_EVENT_REALTIME:
630        case TRACE_OPTION_REPLAY_SPEEDUP:
631                /* Live capture is always going to be realtime */
632                return -1;
633        case TRACE_OPTION_HASHER:
634                /* Lets just say we did this, it's currently still up to
635                 * the user to configure this correctly. */
636                return 0;
637        case TRACE_OPTION_CONSTANT_ERF_FRAMING:
638                return -1;
639        }
640        return -1;
641}
642
643/* Starts a DAG output trace */
644static int dag_start_output(libtrace_out_t *libtrace)
645{
646        struct timeval zero, nopoll;
647
648        zero.tv_sec = 0;
649        zero.tv_usec = 0;
650        nopoll = zero;
651
652        /* Attach and start the DAG stream */
653        if (dag_attach_stream64(FORMAT_DATA_OUT->device->fd,
654                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
655                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
656                return -1;
657        }
658
659        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
660                        FORMAT_DATA_OUT->dagstream) < 0) {
661                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
662                return -1;
663        }
664        FORMAT_DATA_OUT->stream_attached = 1;
665
666        /* We don't want the dag card to do any sleeping */
667        dag_set_stream_poll64(FORMAT_DATA_OUT->device->fd,
668                        FORMAT_DATA_OUT->dagstream, 0, &zero,
669                        &nopoll);
670
671        return 0;
672}
673
674static int dag_start_input_stream(libtrace_t *libtrace,
675                                  struct dag_per_stream_t * stream) {
676        struct timeval zero, nopoll;
677        uint8_t *top, *bottom, *starttop;
678        top = bottom = NULL;
679
680        zero.tv_sec = 0;
681        zero.tv_usec = 10000;
682        nopoll = zero;
683
684        /* Attach and start the DAG stream */
685        if (dag_attach_stream64(FORMAT_DATA->device->fd,
686                              stream->dagstream, 0, 0) < 0) {
687                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
688                              stream->dagstream);
689                return -1;
690        }
691
692        if (dag_start_stream(FORMAT_DATA->device->fd,
693                             stream->dagstream) < 0) {
694                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
695                              stream->dagstream);
696                return -1;
697        }
698        FORMAT_DATA->stream_attached = 1;
699
700        /* We don't want the dag card to do any sleeping */
701        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
702                            stream->dagstream, 0, &zero,
703                            &nopoll) < 0) {
704                trace_set_err(libtrace, errno,
705                              "dag_set_stream_poll failed!");
706                return -1;
707        }
708
709        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
710                                      stream->dagstream,
711                                      &bottom);
712
713        /* Should probably flush the memory hole now */
714        top = starttop;
715        while (starttop - bottom > 0) {
716                bottom += (starttop - bottom);
717                top = dag_advance_stream(FORMAT_DATA->device->fd,
718                                         stream->dagstream,
719                                         &bottom);
720        }
721        stream->top = top;
722        stream->bottom = bottom;
723        stream->processed = 0;
724        stream->drops = 0;
725
726        return 0;
727
728}
729
730/* Starts a DAG input trace */
731static int dag_start_input(libtrace_t *libtrace)
732{
733        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
734}
735
736static int dag_pstart_input(libtrace_t *libtrace)
737{
738        char *scan, *tok;
739        uint16_t stream_count = 0, max_streams;
740        int iserror = 0;
741        struct dag_per_stream_t stream_data;
742
743        /* Check we aren't trying to create more threads than the DAG card can
744         * handle */
745        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
746        if (libtrace->perpkt_thread_count > max_streams) {
747                fprintf(stderr,
748                              "WARNING: DAG has only %u streams available, "
749                              "capping total number of threads at this value.",
750                              max_streams);
751                libtrace->perpkt_thread_count = max_streams;
752        }
753
754        /* Get the stream names from the uri */
755        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
756                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
757                              "Format uri doesn't specify the DAG streams");
758                iserror = 1;
759                goto cleanup;
760        }
761
762        scan++;
763
764        tok = strtok(scan, ",");
765        while (tok != NULL) {
766                /* Ensure we haven't specified too many streams */
767                if (stream_count >= libtrace->perpkt_thread_count) {
768                        fprintf(stderr,
769                                      "WARNING: Format uri specifies too many "
770                                      "streams. Maximum is %u, so only using "
771                                      "the first %u from the uri.",
772                                      libtrace->perpkt_thread_count, 
773                                      libtrace->perpkt_thread_count);
774                        break;
775                }
776
777                /* Save the stream details */
778                if (stream_count == 0) {
779                        /* Special case where we update the existing stream
780                         * data structure */
781                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
782                } else {
783                        memset(&stream_data, 0, sizeof(stream_data));
784                        stream_data.dagstream = (uint16_t)atoi(tok);
785                        libtrace_list_push_back(FORMAT_DATA->per_stream,
786                                                &stream_data);
787                }
788
789                stream_count++;
790                tok = strtok(NULL, ",");
791        }
792
793        if (stream_count < libtrace->perpkt_thread_count) {
794                libtrace->perpkt_thread_count = stream_count;
795        }
796       
797        FORMAT_DATA->stream_attached = 1;
798
799 cleanup:
800        if (iserror) {
801                return -1;
802        } else {
803                return 0;
804        }
805}
806
807/* Pauses a DAG output trace */
808static int dag_pause_output(libtrace_out_t *libtrace)
809{
810        /* Stop and detach the stream */
811        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
812                            FORMAT_DATA_OUT->dagstream) < 0) {
813                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
814                return -1;
815        }
816        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
817                              FORMAT_DATA_OUT->dagstream) < 0) {
818                trace_set_err_out(libtrace, errno,
819                                  "Could not detach DAG stream");
820                return -1;
821        }
822        FORMAT_DATA_OUT->stream_attached = 0;
823        return 0;
824}
825
826/* Pauses a DAG input trace */
827static int dag_pause_input(libtrace_t *libtrace)
828{
829        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
830
831        /* Stop and detach each stream */
832        while (tmp != NULL) {
833                if (dag_stop_stream(FORMAT_DATA->device->fd,
834                                    STREAM_DATA(tmp)->dagstream) < 0) {
835                        trace_set_err(libtrace, errno,
836                                      "Could not stop DAG stream");
837                        return -1;
838                }
839                if (dag_detach_stream(FORMAT_DATA->device->fd,
840                                      STREAM_DATA(tmp)->dagstream) < 0) {
841                        trace_set_err(libtrace, errno,
842                                      "Could not detach DAG stream");
843                        return -1;
844                }
845
846                tmp = tmp->next;
847        }
848
849        FORMAT_DATA->stream_attached = 0;
850        return 0;
851}
852
853
854
855/* Closes a DAG input trace */
856static int dag_fin_input(libtrace_t *libtrace)
857{
858        /* Need the lock, since we're going to be handling the device list */
859        pthread_mutex_lock(&open_dag_mutex);
860
861        /* Detach the stream if we are not paused */
862        if (FORMAT_DATA->stream_attached)
863                dag_pause_input(libtrace);
864        FORMAT_DATA->device->ref_count--;
865
866        /* Close the DAG device if there are no more references to it */
867        if (FORMAT_DATA->device->ref_count == 0)
868                dag_close_device(FORMAT_DATA->device);
869
870        if (DUCK.dummy_duck)
871                trace_destroy_dead(DUCK.dummy_duck);
872
873        /* Clear the list */
874        libtrace_list_deinit(FORMAT_DATA->per_stream);
875        free(libtrace->format_data);
876        pthread_mutex_unlock(&open_dag_mutex);
877        return 0; /* success */
878}
879
880/* Closes a DAG output trace */
881static int dag_fin_output(libtrace_out_t *libtrace)
882{
883
884        /* Commit any outstanding traffic in the txbuffer */
885        if (FORMAT_DATA_OUT->waiting) {
886                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
887                                           FORMAT_DATA_OUT->dagstream,
888                                           FORMAT_DATA_OUT->waiting );
889        }
890
891        /* Wait until the buffer is nearly clear before exiting the program,
892         * as we will lose packets otherwise */
893        dag_tx_get_stream_space64
894                (FORMAT_DATA_OUT->device->fd,
895                 FORMAT_DATA_OUT->dagstream,
896                 dag_get_stream_buffer_size64(FORMAT_DATA_OUT->device->fd,
897                                            FORMAT_DATA_OUT->dagstream) - 8);
898
899        /* Need the lock, since we're going to be handling the device list */
900        pthread_mutex_lock(&open_dag_mutex);
901
902        /* Detach the stream if we are not paused */
903        if (FORMAT_DATA_OUT->stream_attached)
904                dag_pause_output(libtrace);
905        FORMAT_DATA_OUT->device->ref_count --;
906
907        /* Close the DAG device if there are no more references to it */
908        if (FORMAT_DATA_OUT->device->ref_count == 0)
909                dag_close_device(FORMAT_DATA_OUT->device);
910        free(libtrace->format_data);
911        pthread_mutex_unlock(&open_dag_mutex);
912        return 0; /* success */
913}
914
915#ifdef DAGIOC_CARD_DUCK
916#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
917#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
918#else
919#ifdef DAGIOCDUCK
920#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
921#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
922#else
923#warning "DAG appears to be missing DUCK support"
924#endif
925#endif
926
927/* Extracts DUCK information from the DAG card and produces a DUCK packet */
928static int dag_get_duckinfo(libtrace_t *libtrace,
929                                libtrace_packet_t *packet) {
930
931        if (DUCK.duck_freq == 0)
932                return 0;
933
934#ifndef LIBTRACE_DUCK_IOCTL
935        trace_set_err(libtrace, errno, 
936                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
937        DUCK.duck_freq = 0;
938        return -1;
939#endif
940
941        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
942                return 0;
943
944        /* Allocate memory for the DUCK data */
945        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
946            !packet->buffer) {
947                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
948                packet->buf_control = TRACE_CTRL_PACKET;
949                if (!packet->buffer) {
950                        trace_set_err(libtrace, errno,
951                                      "Cannot allocate packet buffer");
952                        return -1;
953                }
954        }
955
956        /* DUCK doesn't have a format header */
957        packet->header = 0;
958        packet->payload = packet->buffer;
959
960        /* No need to check if we can get DUCK or not - we're modern
961         * enough so just grab the DUCK info */
962        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
963                   (duckinf_t *)packet->payload) < 0)) {
964                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
965                DUCK.duck_freq = 0;
966                return -1;
967        }
968
969        packet->type = LIBTRACE_DUCK_VERSION;
970
971        /* Set the packet's trace to point at a DUCK trace, so that the
972         * DUCK format functions will be called on the packet rather than the
973         * DAG ones */
974        if (!DUCK.dummy_duck)
975                DUCK.dummy_duck = trace_create_dead("duck:dummy");
976        packet->trace = DUCK.dummy_duck;
977        DUCK.last_duck = DUCK.last_pkt;
978        packet->error = sizeof(duckinf_t);
979        return sizeof(duckinf_t);
980}
981
982/* Determines the amount of data available to read from the DAG card */
983static int dag_available(libtrace_t *libtrace,
984                         struct dag_per_stream_t *stream_data)
985{
986        uint32_t diff = stream_data->top - stream_data->bottom;
987
988        /* If we've processed more than 4MB of data since we last called
989         * dag_advance_stream, then we should call it again to allow the
990         * space occupied by that 4MB to be released */
991        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
992                return diff;
993
994        /* Update the top and bottom pointers */
995        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
996                                              stream_data->dagstream,
997                                              &(stream_data->bottom));
998
999        if (stream_data->top == NULL) {
1000                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
1001                return -1;
1002        }
1003        stream_data->processed = 0;
1004        diff = stream_data->top - stream_data->bottom;
1005        return diff;
1006}
1007
1008/* Returns a pointer to the start of the next complete ERF record */
1009static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
1010{
1011        dag_record_t *erfptr = NULL;
1012        uint16_t size;
1013
1014        erfptr = (dag_record_t *)stream_data->bottom;
1015        if (!erfptr)
1016                return NULL;
1017
1018        size = ntohs(erfptr->rlen);
1019        if (size < dag_record_size) {
1020                fprintf(stderr, "DAG2.5 rlen is invalid (rlen %u, must be at least %u\n",
1021                        size, dag_record_size);
1022                return NULL;
1023        }
1024
1025        /* Make certain we have the full packet available */
1026        if (size > (stream_data->top - stream_data->bottom))
1027                return NULL;
1028
1029        stream_data->bottom += size;
1030        stream_data->processed += size;
1031        return erfptr;
1032}
1033
1034/* Converts a buffer containing a recently read DAG packet record into a
1035 * libtrace packet */
1036static int dag_prepare_packet_stream(libtrace_t *libtrace,
1037                                     struct dag_per_stream_t *stream_data,
1038                                     libtrace_packet_t *packet,
1039                                     void *buffer, libtrace_rt_types_t rt_type,
1040                                     uint32_t flags)
1041{
1042        dag_record_t *erfptr;
1043
1044        /* If the packet previously owned a buffer that is not the buffer
1045         * that contains the new packet data, we're going to need to free the
1046         * old one to avoid memory leaks */
1047        if (packet->buffer != buffer &&
1048            packet->buf_control == TRACE_CTRL_PACKET) {
1049                free(packet->buffer);
1050        }
1051
1052        /* Set the buffer owner appropriately */
1053        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1054                packet->buf_control = TRACE_CTRL_PACKET;
1055        } else
1056                packet->buf_control = TRACE_CTRL_EXTERNAL;
1057
1058        /* Update the packet pointers and type appropriately */
1059        erfptr = (dag_record_t *)buffer;
1060        packet->buffer = erfptr;
1061        packet->header = erfptr;
1062        packet->type = rt_type;
1063
1064        if (erfptr->flags.rxerror == 1) {
1065                /* rxerror means the payload is corrupt - drop the payload
1066                 * by tweaking rlen */
1067                packet->payload = NULL;
1068                erfptr->rlen = htons(erf_get_framing_length(packet));
1069        } else {
1070                packet->payload = (char*)packet->buffer
1071                        + erf_get_framing_length(packet);
1072        }
1073
1074        if (libtrace->format_data == NULL) {
1075                dag_init_format_data(libtrace);
1076        }
1077
1078        /* Update the dropped packets counter */
1079        /* No loss counter for DSM coloured records - have to use some
1080         * other API */
1081        if (erf_is_color_type(erfptr->type)) {
1082                /* TODO */
1083        } else {
1084                /* Use the ERF loss counter */
1085                if (stream_data->seeninterface[erfptr->flags.iface]
1086                    == 0) {
1087                        stream_data->seeninterface[erfptr->flags.iface]
1088                                = 1;
1089                } else {
1090                        stream_data->drops += ntohs(erfptr->lctr);
1091                }
1092        }
1093
1094        return 0;
1095}
1096
1097static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1098                              void *buffer, libtrace_rt_types_t rt_type,
1099                              uint32_t flags)
1100{
1101        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1102                                       buffer, rt_type, flags);
1103}
1104
1105/*
1106 * dag_write_packet() at this stage attempts to improve tx performance
1107 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1108 * has been met. I observed approximately 270% performance increase
1109 * through this relatively naive tweak. No optimisation of buffer sizes
1110 * was attempted.
1111 */
1112
1113/* Pushes an ERF record onto the transmit stream */
1114static int dag_dump_packet(libtrace_out_t *libtrace,
1115                           dag_record_t *erfptr, unsigned int pad,
1116                           void *buffer)
1117{
1118        int size;
1119
1120        /*
1121         * If we've got 0 bytes waiting in the txqueue, assume that we
1122         * haven't requested any space yet, and request some, storing
1123         * the pointer at FORMAT_DATA_OUT->txbuffer.
1124         *
1125         * The amount to request is slightly magical at the moment - it's
1126         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1127         * the buffer and handle overruns.
1128         */
1129        if (FORMAT_DATA_OUT->waiting == 0) {
1130                FORMAT_DATA_OUT->txbuffer =
1131                        dag_tx_get_stream_space64(FORMAT_DATA_OUT->device->fd,
1132                                                FORMAT_DATA_OUT->dagstream,
1133                                                16908288);
1134        }
1135
1136        /*
1137         * Copy the header separately to the body, as we can't guarantee they
1138         * are in contiguous memory
1139         */
1140        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1141               (dag_record_size + pad));
1142        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1143
1144        /*
1145         * Copy our incoming packet into the outgoing buffer, and increment
1146         * our waiting count
1147         */
1148        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1149        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1150               size);
1151        FORMAT_DATA_OUT->waiting += size;
1152
1153        /*
1154         * If our output buffer has more than 16 Mebibytes in it, commit those
1155         * bytes and reset the waiting count to 0.
1156         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1157         * case there is still data in the buffer at program exit.
1158         */
1159        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1160                FORMAT_DATA_OUT->txbuffer =
1161                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1162                                                   FORMAT_DATA_OUT->dagstream,
1163                                                   FORMAT_DATA_OUT->waiting);
1164                FORMAT_DATA_OUT->waiting = 0;
1165        }
1166
1167        return size + pad + dag_record_size;
1168}
1169
1170/* Attempts to determine a suitable ERF type for a given packet. Returns true
1171 * if one is found, false otherwise */
1172static bool find_compatible_linktype(libtrace_out_t *libtrace,
1173                                     libtrace_packet_t *packet, char *type)
1174{
1175        /* Keep trying to simplify the packet until we can find
1176         * something we can do with it */
1177
1178        do {
1179                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1180
1181                /* Success */
1182                if (*type != (char)-1)
1183                        return true;
1184
1185                if (!demote_packet(packet)) {
1186                        trace_set_err_out(libtrace,
1187                                          TRACE_ERR_NO_CONVERSION,
1188                                          "No erf type for packet (%i)",
1189                                          trace_get_link_type(packet));
1190                        return false;
1191                }
1192
1193        } while(1);
1194
1195        return true;
1196}
1197
1198/* Writes a packet to the provided DAG output trace */
1199static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1200{
1201        /* Check dag can write this type of packet */
1202        if (!dag_can_write(packet)) {
1203                return 0;
1204        }
1205
1206        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1207         * coding sucks, sorry about that.
1208         */
1209        unsigned int pad = 0;
1210        int numbytes;
1211        void *payload = packet->payload;
1212        dag_record_t *header = (dag_record_t *)packet->header;
1213        char erf_type = 0;
1214
1215        if(!packet->header) {
1216                /* No header, probably an RT packet. Lifted from
1217                 * erf_write_packet(). */
1218                return -1;
1219        }
1220
1221        pad = dag_get_padding(packet);
1222
1223        /*
1224         * If the payload is null, adjust the rlen. Discussion of this is
1225         * attached to erf_write_packet()
1226         */
1227        if (payload == NULL) {
1228                header->rlen = htons(dag_record_size + pad);
1229        }
1230
1231        if (packet->type == TRACE_RT_DATA_ERF) {
1232                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1233        } else {
1234                /* Build up a new packet header from the existing header */
1235
1236                /* Simplify the packet first - if we can't do this, break
1237                 * early */
1238                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1239                        return -1;
1240
1241                dag_record_t erfhdr;
1242
1243                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1244                payload=packet->payload;
1245                pad = dag_get_padding(packet);
1246
1247                /* Flags. Can't do this */
1248                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1249                if (trace_get_direction(packet)!=(int)~0U)
1250                        erfhdr.flags.iface = trace_get_direction(packet);
1251
1252                erfhdr.type = erf_type;
1253
1254                /* Packet length (rlen includes format overhead) */
1255                if (trace_get_capture_length(packet) <= 0
1256                       || trace_get_capture_length(packet) > 65536) {
1257                        trace_set_err_out(libtrace, TRACE_ERR_BAD_PACKET,
1258                                "Capture length is out of range in dag_write_packet()");
1259                        return -1;
1260                }
1261                if (erf_get_framing_length(packet) <= 0
1262                       || trace_get_framing_length(packet) > 65536) {
1263                        trace_set_err_out(libtrace, TRACE_ERR_BAD_PACKET,
1264                                "Framing length is out of range in dag_write_packet()");
1265                        return -1;
1266                }
1267                if (trace_get_capture_length(packet) +
1268                       erf_get_framing_length(packet) <= 0
1269                       || trace_get_capture_length(packet) +
1270                       erf_get_framing_length(packet) > 65536) {
1271                        trace_set_err_out(libtrace, TRACE_ERR_BAD_PACKET,
1272                                "Capture + framing length is out of range in dag_write_packet()");
1273                        return -1;
1274                }
1275
1276                erfhdr.rlen = htons(trace_get_capture_length(packet)
1277                                    + erf_get_framing_length(packet));
1278
1279
1280                /* Loss counter. Can't do this */
1281                erfhdr.lctr = 0;
1282                /* Wire length, does not include padding! */
1283                erfhdr.wlen = htons(trace_get_wire_length(packet));
1284
1285                /* Write it out */
1286                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1287        }
1288
1289        return numbytes;
1290}
1291
1292/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1293 *
1294 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1295 */
1296static int dag_read_packet_stream(libtrace_t *libtrace,
1297                                struct dag_per_stream_t *stream_data,
1298                                libtrace_thread_t *t, /* Optional */
1299                                libtrace_packet_t *packet)
1300{
1301        int size = 0;
1302        dag_record_t *erfptr = NULL;
1303        struct timeval tv;
1304        int numbytes = 0;
1305        uint32_t flags = 0;
1306        struct timeval maxwait, pollwait;
1307
1308        pollwait.tv_sec = 0;
1309        pollwait.tv_usec = 10000;
1310        maxwait.tv_sec = 0;
1311        maxwait.tv_usec = 250000;
1312
1313        /* Check if we're due for a DUCK report - only report on the first thread */
1314        if (stream_data == FORMAT_DATA_FIRST) {
1315                size = dag_get_duckinfo(libtrace, packet);
1316                if (size != 0)
1317                        return size;
1318        }
1319
1320
1321        /* Don't let anyone try to free our DAG memory hole! */
1322        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1323
1324        /* If the packet buffer is currently owned by libtrace, free it so
1325         * that we can set the packet to point into the DAG memory hole */
1326        if (packet->buf_control == TRACE_CTRL_PACKET) {
1327                free(packet->buffer);
1328                packet->buffer = 0;
1329        }
1330
1331        if (dag_set_stream_poll64(FORMAT_DATA->device->fd, stream_data->dagstream,
1332                                sizeof(dag_record_t), &maxwait,
1333                                &pollwait) == -1) {
1334                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1335                return -1;
1336        }
1337
1338        /* Grab a full ERF record */
1339        do {
1340                numbytes = dag_available(libtrace, stream_data);
1341                if (numbytes < 0)
1342                        return numbytes;
1343                if (numbytes < dag_record_size) {
1344                        /* Check the message queue if we have one to check */
1345                        if (t != NULL &&
1346                            libtrace_message_queue_count(&t->messages) > 0)
1347                                return -2;
1348
1349                        if ((numbytes=is_halted(libtrace)) != -1)
1350                                return numbytes;
1351                        /* Block until we see a packet */
1352                        continue;
1353                }
1354                erfptr = dag_get_record(stream_data);
1355        } while (erfptr == NULL);
1356
1357        packet->trace = libtrace;
1358
1359        /* Prepare the libtrace packet */
1360        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1361                                    TRACE_RT_DATA_ERF, flags))
1362                return -1;
1363
1364        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1365        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1366                tv = trace_get_timeval(packet);
1367                DUCK.last_pkt = tv.tv_sec;
1368        }
1369
1370        packet->order = erf_get_erf_timestamp(packet);
1371        packet->error = packet->payload ? htons(erfptr->rlen) :
1372                                          erf_get_framing_length(packet);
1373
1374        return packet->error;
1375}
1376
1377static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1378{
1379        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1380}
1381
1382static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1383                             libtrace_packet_t **packets, size_t nb_packets)
1384{
1385        int ret;
1386        size_t read_packets = 0;
1387        int numbytes = 0;
1388
1389        struct dag_per_stream_t *stream_data =
1390                (struct dag_per_stream_t *)t->format_data;
1391
1392        /* Read as many packets as we can, but read atleast one packet */
1393        do {
1394                ret = dag_read_packet_stream(libtrace, stream_data, t,
1395                                           packets[read_packets]);
1396                if (ret < 0)
1397                        return ret;
1398
1399                read_packets++;
1400
1401                /* Make sure we don't read too many packets..! */
1402                if (read_packets >= nb_packets)
1403                        break;
1404
1405                numbytes = dag_available(libtrace, stream_data);
1406        } while (numbytes >= dag_record_size);
1407
1408        return read_packets;
1409}
1410
1411/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1412 * packet is available, we will return a packet event. Otherwise we will
1413 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1414 */
1415static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1416                                           libtrace_packet_t *packet)
1417{
1418        libtrace_eventobj_t event = {0,0,0.0,0};
1419        dag_record_t *erfptr = NULL;
1420        int numbytes;
1421        uint32_t flags = 0;
1422        struct timeval minwait, tv;
1423       
1424        minwait.tv_sec = 0;
1425        minwait.tv_usec = 10000;
1426
1427        /* Check if we're meant to provide a DUCK update */
1428        numbytes = dag_get_duckinfo(libtrace, packet);
1429        if (numbytes < 0) {
1430                event.type = TRACE_EVENT_TERMINATE;
1431                return event;
1432        } else if (numbytes > 0) {
1433                event.type = TRACE_EVENT_PACKET;
1434                return event;
1435        }
1436       
1437        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
1438                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1439                                &minwait) == -1) {
1440                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1441                event.type = TRACE_EVENT_TERMINATE;
1442                return event;
1443        }
1444
1445        do {
1446                erfptr = NULL;
1447                numbytes = 0;
1448
1449                /* Need to call dag_available so that the top pointer will get
1450                 * updated, otherwise we'll never see any data! */
1451                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1452
1453                /* May as well not bother calling dag_get_record if
1454                 * dag_available suggests that there's no data */
1455                if (numbytes != 0)
1456                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1457                if (erfptr == NULL) {
1458                        /* No packet available - sleep for a very short time */
1459                        if (libtrace_halt) {
1460                                event.type = TRACE_EVENT_TERMINATE;
1461                        } else {
1462                                event.type = TRACE_EVENT_SLEEP;
1463                                event.seconds = 0.0001;
1464                        }
1465                        break;
1466                }
1467                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1468                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1469                        event.type = TRACE_EVENT_TERMINATE;
1470                        break;
1471                }
1472
1473
1474                event.size = trace_get_capture_length(packet) +
1475                        trace_get_framing_length(packet);
1476
1477                /* XXX trace_read_packet() normally applies the following
1478                 * config options for us, but this function is called via
1479                 * trace_event() so we have to do it ourselves */
1480
1481                if (libtrace->filter) {
1482                        int filtret = trace_apply_filter(libtrace->filter,
1483                                                         packet);
1484                        if (filtret == -1) {
1485                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1486                                              "Bad BPF Filter");
1487                                event.type = TRACE_EVENT_TERMINATE;
1488                                break;
1489                        }
1490
1491                        if (filtret == 0) {
1492                                /* This packet isn't useful so we want to
1493                                 * immediately see if there is another suitable
1494                                 * one - we definitely DO NOT want to return
1495                                 * a sleep event in this case, like we used to
1496                                 * do! */
1497                                libtrace->filtered_packets ++;
1498                                trace_clear_cache(packet);
1499                                continue;
1500                        }
1501
1502                        event.type = TRACE_EVENT_PACKET;
1503                } else {
1504                        event.type = TRACE_EVENT_PACKET;
1505                }
1506
1507                /* Update the DUCK timer */
1508                tv = trace_get_timeval(packet);
1509                DUCK.last_pkt = tv.tv_sec;
1510               
1511                if (libtrace->snaplen > 0) {
1512                        trace_set_capture_length(packet, libtrace->snaplen);
1513                }
1514                libtrace->accepted_packets ++;
1515                break;
1516        } while(1);
1517
1518        return event;
1519}
1520
1521static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1522{
1523        libtrace_list_node_t *tmp;
1524        if (!libtrace) {
1525                fprintf(stderr, "NULL trace passed into dag_get_statistics()\n");
1526                return;
1527        }
1528        if (!stat) {
1529                trace_set_err(libtrace, TRACE_ERR_STAT, "NULL stat passed into dag_get_statistics()");
1530                return;
1531        }
1532        tmp = FORMAT_DATA_HEAD;
1533
1534        /* Dropped packets */
1535        stat->dropped_valid = 1;
1536        stat->dropped = 0;
1537        while (tmp != NULL) {
1538                stat->dropped += STREAM_DATA(tmp)->drops;
1539                tmp = tmp->next;
1540        }
1541
1542}
1543
1544static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1545                                       libtrace_stat_t *stat) {
1546        struct dag_per_stream_t *stream_data = t->format_data;
1547        if (!libtrace) {
1548                fprintf(stderr, "NULL trace passed into dag_get_thread_statistics()\n");
1549                return;
1550        }
1551        if (!stat) {
1552                trace_set_err(libtrace, TRACE_ERR_STAT, "NULL stat passed into dag_get_thread_statistics()");
1553                return;
1554        }
1555        stat->dropped_valid = 1;
1556        stat->dropped = stream_data->drops;
1557
1558        stat->filtered_valid = 1;
1559        stat->filtered = 0;
1560}
1561
1562/* Prints some semi-useful help text about the DAG format module */
1563static void dag_help(void) {
1564        printf("dag format module: $Revision: 1755 $\n");
1565        printf("Supported input URIs:\n");
1566        printf("\tdag:/dev/dagn\n");
1567        printf("\n");
1568        printf("\te.g.: dag:/dev/dag0\n");
1569        printf("\n");
1570        printf("Supported output URIs:\n");
1571        printf("\tnone\n");
1572        printf("\n");
1573}
1574
1575static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1576                                bool reader)
1577{
1578        struct dag_per_stream_t *stream_data;
1579        libtrace_list_node_t *node;
1580
1581        if (reader && t->type == THREAD_PERPKT) {
1582                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1583                                                t->perpkt_num);
1584                if (node == NULL) {
1585                        return -1;
1586                }
1587                stream_data = node->data;
1588
1589                /* Pass the per thread data to the thread */
1590                t->format_data = stream_data;
1591
1592                /* Attach and start the DAG stream */
1593                if (dag_start_input_stream(libtrace, stream_data) < 0)
1594                        return -1;
1595        }
1596
1597        return 0;
1598}
1599
1600static struct libtrace_format_t dag = {
1601        "dag",
1602        "$Id$",
1603        TRACE_FORMAT_ERF,
1604        dag_probe_filename,             /* probe filename */
1605        NULL,                           /* probe magic */
1606        dag_init_input,                 /* init_input */
1607        dag_config_input,               /* config_input */
1608        dag_start_input,                /* start_input */
1609        dag_pause_input,                /* pause_input */
1610        dag_init_output,                /* init_output */
1611        NULL,                           /* config_output */
1612        dag_start_output,               /* start_output */
1613        dag_fin_input,                  /* fin_input */
1614        dag_fin_output,                 /* fin_output */
1615        dag_read_packet,                /* read_packet */
1616        dag_prepare_packet,             /* prepare_packet */
1617        NULL,                           /* fin_packet */
1618        dag_write_packet,               /* write_packet */
1619        NULL,                           /* flush_output */
1620        erf_get_link_type,              /* get_link_type */
1621        erf_get_direction,              /* get_direction */
1622        erf_set_direction,              /* set_direction */
1623        erf_get_erf_timestamp,          /* get_erf_timestamp */
1624        NULL,                           /* get_timeval */
1625        NULL,                           /* get_seconds */
1626        NULL,                           /* get_timespec */
1627        NULL,                           /* seek_erf */
1628        NULL,                           /* seek_timeval */
1629        NULL,                           /* seek_seconds */
1630        erf_get_capture_length,         /* get_capture_length */
1631        erf_get_wire_length,            /* get_wire_length */
1632        erf_get_framing_length,         /* get_framing_length */
1633        erf_set_capture_length,         /* set_capture_length */
1634        NULL,                           /* get_received_packets */
1635        NULL,                           /* get_filtered_packets */
1636        NULL,                           /* get_dropped_packets */
1637        dag_get_statistics,             /* get_statistics */
1638        NULL,                           /* get_fd */
1639        trace_event_dag,                /* trace_event */
1640        dag_help,                       /* help */
1641        NULL,                            /* next pointer */
1642        {true, 0}, /* live packet capture, thread limit TBD */
1643        dag_pstart_input,
1644        dag_pread_packets,
1645        dag_pause_input,
1646        NULL,
1647        dag_pregister_thread,
1648        NULL,
1649        dag_get_thread_statistics       /* get thread stats */
1650};
1651
1652void dag_constructor(void)
1653{
1654        register_format(&dag);
1655}
Note: See TracBrowser for help on using the repository browser.