source: lib/format_dag25.c @ a857389

cachetimestampsdevelopetsiliverc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since a857389 was a857389, checked in by Shane Alcock <salcock@…>, 3 years ago

Initial support for ERF provenance records

Update erftypes.h with TYPE_META (27).
Check for ERF_TYPE_MAX rather than some arbitrary type in ERF sanity checks. In Wireshark we recently completely removed these checks as there are only a few types before TYPE_PAD/ERF_TYPE_MAX, but leave them in for now.
Add TRACE_TYPE_ERF_META for provenance record payload.
Continue to use TRACE_RT_DATA_ERF as provenance is a valid ERF record. Note: this means that LIBTRACE_IS_META_PACKET() will currently return FALSE which may confuse some tools. Other places in the code also tend to check for TRACE_TYPE_NONDATA which isn't true here either.
Return zero for wire length of provenance records.
Don't allow snapping them (just return the same value).
Skip provenance records in l2 parsers and trace_get_payload_from_meta().
Return provenance payload for trace_get_packet_meta().

Also add support for a couple of missing ERF_TYPE_ETH_COLOR variants.

  • Property mode set to 100644
File size: 45.3 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#define _GNU_SOURCE
27
28#include "config.h"
29#include "common.h"
30#include "libtrace.h"
31#include "libtrace_int.h"
32#include "format_helper.h"
33#include "format_erf.h"
34
35#include <assert.h>
36#include <errno.h>
37#include <fcntl.h>
38#include <stdio.h>
39#include <string.h>
40#include <stdlib.h>
41#include <sys/stat.h>
42
43#include <sys/mman.h>
44/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
45 * Windows anyway so we'll worry about this more later :] */
46#include <pthread.h>
47
48
49#ifdef HAVE_DAG_CONFIG_API_H
50#include <dag_config_api.h>
51#endif
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66/* This format deals with DAG cards that are using drivers from the 2.5 version
67 * onwards, including 3.X.
68 *
69 * DAG is a LIVE capture format.
70 *
71 * This format does support writing, provided the DAG card that you are using
72 * has transmit (Tx) support. Additionally, packets read using this format
73 * are in the ERF format, so can easily be written as ERF traces without
74 * losing any data.
75 */
76
77#define DATA(x) ((struct dag_format_data_t *)x->format_data)
78#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
79#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
80
81#define FORMAT_DATA DATA(libtrace)
82#define FORMAT_DATA_OUT DATA_OUT(libtrace)
83
84#define DUCK FORMAT_DATA->duck
85
86#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
87#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
88
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* The DAG device being used for writing */
107        struct dag_dev_t *device;
108        /* The DAG stream that is being written on */
109        unsigned int dagstream;
110        /* Boolean flag indicating whether the stream is currently attached */
111        int stream_attached;
112        /* The amount of data waiting to be transmitted, in bytes */
113        uint64_t waiting;
114        /* A buffer to hold the data to be transmittted */
115        uint8_t *txbuffer;
116};
117
118/* Data that is stored against each input stream */
119struct dag_per_stream_t {
120        /* DAG stream number */
121        uint16_t dagstream;
122        /* Pointer to the last unread byte in the DAG memory */
123        uint8_t *top;
124        /* Pointer to the first unread byte in the DAG memory */
125        uint8_t *bottom;
126        /* Amount of data processed from the bottom pointer */
127        uint32_t processed;
128        /* Number of packets seen by the stream */
129        uint64_t pkt_count;
130        /* Drop count for this particular stream */
131        uint64_t drops;
132        /* Boolean values to indicate if a particular interface has been seen
133         * or not. This is limited to four interfaces, which is enough to
134         * support all current DAG cards */
135        uint8_t seeninterface[4];
136};
137
138/* "Global" data that is stored for each DAG input trace */
139struct dag_format_data_t {
140        /* DAG device */
141        struct dag_dev_t *device;
142        /* Boolean flag indicating whether the trace is currently attached */
143        int stream_attached;
144        /* Data stored against each DAG input stream */
145        libtrace_list_t *per_stream;
146
147        /* Data required for regular DUCK reporting.
148         * We put this on a new cache line otherwise we have a lot of false
149         * sharing caused by updating the last_pkt.
150         * This should only ever be accessed by the first thread stream,
151         * that includes both read and write operations.
152         */
153        struct {
154                /* Timestamp of the last DUCK report */
155                uint32_t last_duck;
156                /* The number of seconds between each DUCK report */
157                uint32_t duck_freq;
158                /* Timestamp of the last packet read from the DAG card */
159                uint32_t last_pkt;
160                /* Dummy trace to ensure DUCK packets are dealt with using the
161                 * DUCK format functions */
162                libtrace_t *dummy_duck;
163        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
164};
165
166/* To be thread-safe, we're going to need a mutex for operating on the list
167 * of DAG devices */
168pthread_mutex_t open_dag_mutex;
169
170/* The list of DAG devices that have been opened by libtrace.
171 *
172 * We can only open each DAG device once, but we might want to read from
173 * multiple streams. Therefore, we need to maintain a list of devices that we
174 * have opened (with ref counts!) so that we don't try to open a device too
175 * many times or close a device that we're still using */
176struct dag_dev_t *open_dags = NULL;
177
178/* Returns the amount of padding between the ERF header and the start of the
179 * captured packet data */
180static int dag_get_padding(const libtrace_packet_t *packet)
181{
182        /* ERF Ethernet records have a 2 byte padding before the packet itself
183         * so that the IP header is aligned on a 32 bit boundary.
184         */
185        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
186                dag_record_t *erfptr = (dag_record_t *)packet->header;
187                switch(erfptr->type) {
188                        case TYPE_ETH:
189                        case TYPE_COLOR_ETH:
190                        case TYPE_DSM_COLOR_ETH:
191                        case TYPE_COLOR_HASH_ETH:
192                                return 2;
193                        default:                return 0;
194                }
195        }
196        else {
197                switch(trace_get_link_type(packet)) {
198                        case TRACE_TYPE_ETH:    return 2;
199                        default:                return 0;
200                }
201        }
202}
203
204/* Attempts to determine if the given filename refers to a DAG device */
205static int dag_probe_filename(const char *filename)
206{
207        struct stat statbuf;
208        /* Can we stat the file? */
209        if (stat(filename, &statbuf) != 0) {
210                return 0;
211        }
212        /* Is it a character device? */
213        if (!S_ISCHR(statbuf.st_mode)) {
214                return 0;
215        }
216        /* Yeah, it's probably us. */
217        return 1;
218}
219
220/* Initialises the DAG output data structure */
221static void dag_init_format_out_data(libtrace_out_t *libtrace)
222{
223        libtrace->format_data = (struct dag_format_data_out_t *)
224                malloc(sizeof(struct dag_format_data_out_t));
225        // no DUCK on output
226        FORMAT_DATA_OUT->stream_attached = 0;
227        FORMAT_DATA_OUT->device = NULL;
228        FORMAT_DATA_OUT->dagstream = 0;
229        FORMAT_DATA_OUT->waiting = 0;
230
231}
232
233/* Initialises the DAG input data structure */
234static void dag_init_format_data(libtrace_t *libtrace)
235{
236        struct dag_per_stream_t stream_data;
237
238        libtrace->format_data = (struct dag_format_data_t *)
239                malloc(sizeof(struct dag_format_data_t));
240        DUCK.last_duck = 0;
241        DUCK.duck_freq = 0;
242        DUCK.last_pkt = 0;
243        DUCK.dummy_duck = NULL;
244
245        FORMAT_DATA->per_stream =
246                libtrace_list_init(sizeof(stream_data));
247        assert(FORMAT_DATA->per_stream != NULL);
248
249        /* We'll start with just one instance of stream_data, and we'll
250         * add more later if we need them */
251        memset(&stream_data, 0, sizeof(stream_data));
252        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
253}
254
255/* Determines if there is already an entry for the given DAG device in the
256 * device list and increments the reference count for that device, if found.
257 *
258 * NOTE: This function assumes the open_dag_mutex is held by the caller */
259static struct dag_dev_t *dag_find_open_device(char *dev_name)
260{
261        struct dag_dev_t *dag_dev;
262
263        dag_dev = open_dags;
264
265        /* XXX: Not exactly zippy, but how often are we going to be dealing
266         * with multiple dag cards? */
267        while (dag_dev != NULL) {
268                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
269                        dag_dev->ref_count ++;
270                        return dag_dev;
271                }
272                dag_dev = dag_dev->next;
273        }
274        return NULL;
275}
276
277/* Closes a DAG device and removes it from the device list.
278 *
279 * Attempting to close a DAG device that has a non-zero reference count will
280 * cause an assertion failure!
281 *
282 * NOTE: This function assumes the open_dag_mutex is held by the caller */
283static void dag_close_device(struct dag_dev_t *dev)
284{
285        /* Need to remove from the device list */
286        assert(dev->ref_count == 0);
287
288        if (dev->prev == NULL) {
289                open_dags = dev->next;
290                if (dev->next)
291                        dev->next->prev = NULL;
292        } else {
293                dev->prev->next = dev->next;
294                if (dev->next)
295                        dev->next->prev = dev->prev;
296        }
297
298        dag_close(dev->fd);
299        free(dev);
300}
301
302
303/* Opens a new DAG device for writing and adds it to the DAG device list
304 *
305 * NOTE: this function should only be called when opening a DAG device for
306 * writing - there is little practical difference between this and the
307 * function below that covers the reading case, but we need the output trace
308 * object to report errors properly so the two functions take slightly
309 * different arguments. This is really lame and there should be a much better
310 * way of doing this.
311 *
312 * NOTE: This function assumes the open_dag_mutex is held by the caller
313 */
314static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
315                                                char *dev_name)
316{
317        struct stat buf;
318        int fd;
319        struct dag_dev_t *new_dev;
320
321        /* Make sure the device exists */
322        if (stat(dev_name, &buf) == -1) {
323                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
324                return NULL;
325        }
326
327        /* Make sure it is the appropriate type of device */
328        if (S_ISCHR(buf.st_mode)) {
329                /* Try opening the DAG device */
330                if((fd = dag_open(dev_name)) < 0) {
331                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
332                                        dev_name);
333                        return NULL;
334                }
335        } else {
336                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
337                                dev_name);
338                return NULL;
339        }
340
341        /* Add the device to our device list - it is just a doubly linked
342         * list with no inherent ordering; just tack the new one on the front
343         */
344        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
345        new_dev->fd = fd;
346        new_dev->dev_name = dev_name;
347        new_dev->ref_count = 1;
348
349        new_dev->prev = NULL;
350        new_dev->next = open_dags;
351        if (open_dags)
352                open_dags->prev = new_dev;
353
354        open_dags = new_dev;
355
356        return new_dev;
357}
358
359/* Opens a new DAG device for reading and adds it to the DAG device list
360 *
361 * NOTE: this function should only be called when opening a DAG device for
362 * reading - there is little practical difference between this and the
363 * function above that covers the writing case, but we need the input trace
364 * object to report errors properly so the two functions take slightly
365 * different arguments. This is really lame and there should be a much better
366 * way of doing this.
367 *
368 * NOTE: This function assumes the open_dag_mutex is held by the caller */
369static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
370        struct stat buf;
371        int fd;
372        struct dag_dev_t *new_dev;
373
374        /* Make sure the device exists */
375        if (stat(dev_name, &buf) == -1) {
376                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
377                return NULL;
378        }
379
380        /* Make sure it is the appropriate type of device */
381        if (S_ISCHR(buf.st_mode)) {
382                /* Try opening the DAG device */
383                if((fd = dag_open(dev_name)) < 0) {
384                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
385                                      dev_name);
386                        return NULL;
387                }
388        } else {
389                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
390                              dev_name);
391                return NULL;
392        }
393
394        /* Add the device to our device list - it is just a doubly linked
395         * list with no inherent ordering; just tack the new one on the front
396         */
397        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
398        new_dev->fd = fd;
399        new_dev->dev_name = dev_name;
400        new_dev->ref_count = 1;
401
402        new_dev->prev = NULL;
403        new_dev->next = open_dags;
404        if (open_dags)
405                open_dags->prev = new_dev;
406
407        open_dags = new_dev;
408
409        return new_dev;
410}
411
412/* Creates and initialises a DAG output trace */
413static int dag_init_output(libtrace_out_t *libtrace)
414{
415        /* Upon successful creation, the device name is stored against the
416         * device and free when it is free()d */
417        char *dag_dev_name = NULL;
418        char *scan = NULL;
419        struct dag_dev_t *dag_device = NULL;
420        int stream = 1;
421
422        /* XXX I don't know if this is important or not, but this function
423         * isn't present in all of the driver releases that this code is
424         * supposed to support! */
425        /*
426        unsigned long wake_time;
427        dagutil_sleep_get_wake_time(&wake_time,0);
428        */
429
430        dag_init_format_out_data(libtrace);
431        /* Grab the mutex while we're likely to be messing with the device
432         * list */
433        pthread_mutex_lock(&open_dag_mutex);
434
435        /* Specific streams are signified using a comma in the libtrace URI,
436         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
437         *
438         * If no stream is specified, we will write using stream 1 */
439        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
440                dag_dev_name = strdup(libtrace->uridata);
441        } else {
442                dag_dev_name = (char *)strndup(libtrace->uridata,
443                                (size_t)(scan - libtrace->uridata));
444                stream = atoi(++scan);
445        }
446        FORMAT_DATA_OUT->dagstream = stream;
447
448        /* See if our DAG device is already open */
449        dag_device = dag_find_open_device(dag_dev_name);
450
451        if (dag_device == NULL) {
452                /* Device not yet opened - open it ourselves */
453                dag_device = dag_open_output_device(libtrace, dag_dev_name);
454        } else {
455                /* Otherwise, just use the existing one */
456                free(dag_dev_name);
457                dag_dev_name = NULL;
458        }
459
460        /* Make sure we have successfully opened a DAG device */
461        if (dag_device == NULL) {
462                if (dag_dev_name) {
463                        free(dag_dev_name);
464                }
465                pthread_mutex_unlock(&open_dag_mutex);
466                return -1;
467        }
468
469        FORMAT_DATA_OUT->device = dag_device;
470        pthread_mutex_unlock(&open_dag_mutex);
471        return 0;
472}
473
474/* Creates and initialises a DAG input trace */
475static int dag_init_input(libtrace_t *libtrace) {
476        /* Upon successful creation, the device name is stored against the
477         * device and free when it is free()d */
478        char *dag_dev_name = NULL;
479        char *scan = NULL;
480        int stream = 0;
481        struct dag_dev_t *dag_device = NULL;
482
483        dag_init_format_data(libtrace);
484        /* Grab the mutex while we're likely to be messing with the device
485         * list */
486        pthread_mutex_lock(&open_dag_mutex);
487
488
489        /* DAG cards support multiple streams. In a single threaded capture,
490         * these are specified using a comma in the libtrace URI,
491         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
492         *
493         * If no stream is specified, we will read from stream 0 with
494         * one thread
495         */
496        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
497                dag_dev_name = strdup(libtrace->uridata);
498        } else {
499                dag_dev_name = (char *)strndup(libtrace->uridata,
500                                (size_t)(scan - libtrace->uridata));
501                stream = atoi(++scan);
502        }
503
504        FORMAT_DATA_FIRST->dagstream = stream;
505
506        /* See if our DAG device is already open */
507        dag_device = dag_find_open_device(dag_dev_name);
508
509        if (dag_device == NULL) {
510                /* Device not yet opened - open it ourselves */
511                dag_device = dag_open_device(libtrace, dag_dev_name);
512        } else {
513                /* Otherwise, just use the existing one */
514                free(dag_dev_name);
515                dag_dev_name = NULL;
516        }
517
518        /* Make sure we have successfully opened a DAG device */
519        if (dag_device == NULL) {
520                if (dag_dev_name)
521                        free(dag_dev_name);
522                dag_dev_name = NULL;
523                pthread_mutex_unlock(&open_dag_mutex);
524                return -1;
525        }
526
527        FORMAT_DATA->device = dag_device;
528
529        /* See Config_Status_API_Programming_Guide.pdf from the Endace
530           Dag Documentation */
531        /* Check kBooleanAttributeActive is true -- no point capturing
532         * on an interface that's disabled
533         *
534         * The symptom of the port being disabled is that libtrace
535         * will appear to hang. */
536        /* Check kBooleanAttributeFault is false */
537        /* Check kBooleanAttributeLocalFault is false */
538        /* Check kBooleanAttributeLock is true ? */
539        /* Check kBooleanAttributePeerLink ? */
540
541        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
542           on libtrace promisc attribute?*/
543        /* Set kUint32AttributeSnapLength to the snaplength */
544
545        pthread_mutex_unlock(&open_dag_mutex);
546        return 0;
547}
548
549#ifdef HAVE_DAG_CONFIG_API_H
550static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
551        dag_card_ref_t card_ref = NULL;
552        dag_component_t root = NULL;
553        attr_uuid_t uuid = 0;
554
555        if (slen < 0)
556                slen = 0; 
557
558        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
559        root = dag_config_get_root_component(card_ref);
560       
561        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
562        dag_config_set_boolean_attribute(card_ref, uuid, true);
563
564        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
565        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
566
567        return 0;
568       
569
570} 
571#endif /* HAVE_DAG_CONFIG_API_H */
572
573/* Configures a DAG input trace */
574static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
575                            void *data)
576{
577        switch(option) {
578        case TRACE_OPTION_META_FREQ:
579                /* This option is used to specify the frequency of DUCK
580                 * updates */
581                DUCK.duck_freq = *(int *)data;
582                return 0;
583        case TRACE_OPTION_SNAPLEN:
584#ifdef HAVE_DAG_CONFIG_API_H
585                return dag_csapi_set_snaplen(libtrace, *(int *)data);
586#else
587                /* Tell the card our new snap length */
588        {
589                char conf_str[4096];
590                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
591                if (dag_configure(FORMAT_DATA->device->fd,
592                                  conf_str) != 0) {
593                        trace_set_err(libtrace, errno, "Failed to configure "
594                                      "snaplen on DAG card: %s",
595                                      libtrace->uridata);
596                        return -1;
597                }
598        }
599#endif /* HAVE_DAG_CONFIG_API_H */
600
601                return 0;
602        case TRACE_OPTION_PROMISC:
603                /* DAG already operates in a promisc fashion */
604                return -1;
605        case TRACE_OPTION_FILTER:
606                /* We don't yet support pushing filters into DAG
607                 * cards */
608                return -1;
609        case TRACE_OPTION_EVENT_REALTIME:
610                /* Live capture is always going to be realtime */
611                return -1;
612        case TRACE_OPTION_HASHER:
613                /* Lets just say we did this, it's currently still up to
614                 * the user to configure this correctly. */
615                return 0;
616        }
617        return -1;
618}
619
620/* Starts a DAG output trace */
621static int dag_start_output(libtrace_out_t *libtrace)
622{
623        struct timeval zero, nopoll;
624
625        zero.tv_sec = 0;
626        zero.tv_usec = 0;
627        nopoll = zero;
628
629        /* Attach and start the DAG stream */
630        if (dag_attach_stream64(FORMAT_DATA_OUT->device->fd,
631                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
632                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
633                return -1;
634        }
635
636        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
637                        FORMAT_DATA_OUT->dagstream) < 0) {
638                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
639                return -1;
640        }
641        FORMAT_DATA_OUT->stream_attached = 1;
642
643        /* We don't want the dag card to do any sleeping */
644        dag_set_stream_poll64(FORMAT_DATA_OUT->device->fd,
645                        FORMAT_DATA_OUT->dagstream, 0, &zero,
646                        &nopoll);
647
648        return 0;
649}
650
651static int dag_start_input_stream(libtrace_t *libtrace,
652                                  struct dag_per_stream_t * stream) {
653        struct timeval zero, nopoll;
654        uint8_t *top, *bottom, *starttop;
655        top = bottom = NULL;
656
657        zero.tv_sec = 0;
658        zero.tv_usec = 10000;
659        nopoll = zero;
660
661        /* Attach and start the DAG stream */
662        if (dag_attach_stream64(FORMAT_DATA->device->fd,
663                              stream->dagstream, 0, 0) < 0) {
664                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
665                              stream->dagstream);
666                return -1;
667        }
668
669        if (dag_start_stream(FORMAT_DATA->device->fd,
670                             stream->dagstream) < 0) {
671                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
672                              stream->dagstream);
673                return -1;
674        }
675        FORMAT_DATA->stream_attached = 1;
676
677        /* We don't want the dag card to do any sleeping */
678        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
679                            stream->dagstream, 0, &zero,
680                            &nopoll) < 0) {
681                trace_set_err(libtrace, errno,
682                              "dag_set_stream_poll failed!");
683                return -1;
684        }
685
686        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
687                                      stream->dagstream,
688                                      &bottom);
689
690        /* Should probably flush the memory hole now */
691        top = starttop;
692        while (starttop - bottom > 0) {
693                bottom += (starttop - bottom);
694                top = dag_advance_stream(FORMAT_DATA->device->fd,
695                                         stream->dagstream,
696                                         &bottom);
697        }
698        stream->top = top;
699        stream->bottom = bottom;
700        stream->processed = 0;
701        stream->drops = 0;
702
703        return 0;
704
705}
706
707/* Starts a DAG input trace */
708static int dag_start_input(libtrace_t *libtrace)
709{
710        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
711}
712
713static int dag_pstart_input(libtrace_t *libtrace)
714{
715        char *scan, *tok;
716        uint16_t stream_count = 0, max_streams;
717        int iserror = 0;
718        struct dag_per_stream_t stream_data;
719
720        /* Check we aren't trying to create more threads than the DAG card can
721         * handle */
722        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
723        if (libtrace->perpkt_thread_count > max_streams) {
724                fprintf(stderr,
725                              "WARNING: DAG has only %u streams available, "
726                              "capping total number of threads at this value.",
727                              max_streams);
728                libtrace->perpkt_thread_count = max_streams;
729        }
730
731        /* Get the stream names from the uri */
732        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
733                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
734                              "Format uri doesn't specify the DAG streams");
735                iserror = 1;
736                goto cleanup;
737        }
738
739        scan++;
740
741        tok = strtok(scan, ",");
742        while (tok != NULL) {
743                /* Ensure we haven't specified too many streams */
744                if (stream_count >= libtrace->perpkt_thread_count) {
745                        fprintf(stderr,
746                                      "WARNING: Format uri specifies too many "
747                                      "streams. Maximum is %u, so only using "
748                                      "the first %u from the uri.",
749                                      libtrace->perpkt_thread_count, 
750                                      libtrace->perpkt_thread_count);
751                        break;
752                }
753
754                /* Save the stream details */
755                if (stream_count == 0) {
756                        /* Special case where we update the existing stream
757                         * data structure */
758                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
759                } else {
760                        memset(&stream_data, 0, sizeof(stream_data));
761                        stream_data.dagstream = (uint16_t)atoi(tok);
762                        libtrace_list_push_back(FORMAT_DATA->per_stream,
763                                                &stream_data);
764                }
765
766                stream_count++;
767                tok = strtok(NULL, ",");
768        }
769
770        if (stream_count < libtrace->perpkt_thread_count) {
771                libtrace->perpkt_thread_count = stream_count;
772        }
773       
774        FORMAT_DATA->stream_attached = 1;
775
776 cleanup:
777        if (iserror) {
778                return -1;
779        } else {
780                return 0;
781        }
782}
783
784/* Pauses a DAG output trace */
785static int dag_pause_output(libtrace_out_t *libtrace)
786{
787        /* Stop and detach the stream */
788        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
789                            FORMAT_DATA_OUT->dagstream) < 0) {
790                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
791                return -1;
792        }
793        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
794                              FORMAT_DATA_OUT->dagstream) < 0) {
795                trace_set_err_out(libtrace, errno,
796                                  "Could not detach DAG stream");
797                return -1;
798        }
799        FORMAT_DATA_OUT->stream_attached = 0;
800        return 0;
801}
802
803/* Pauses a DAG input trace */
804static int dag_pause_input(libtrace_t *libtrace)
805{
806        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
807
808        /* Stop and detach each stream */
809        while (tmp != NULL) {
810                if (dag_stop_stream(FORMAT_DATA->device->fd,
811                                    STREAM_DATA(tmp)->dagstream) < 0) {
812                        trace_set_err(libtrace, errno,
813                                      "Could not stop DAG stream");
814                        return -1;
815                }
816                if (dag_detach_stream(FORMAT_DATA->device->fd,
817                                      STREAM_DATA(tmp)->dagstream) < 0) {
818                        trace_set_err(libtrace, errno,
819                                      "Could not detach DAG stream");
820                        return -1;
821                }
822
823                tmp = tmp->next;
824        }
825
826        FORMAT_DATA->stream_attached = 0;
827        return 0;
828}
829
830
831
832/* Closes a DAG input trace */
833static int dag_fin_input(libtrace_t *libtrace)
834{
835        /* Need the lock, since we're going to be handling the device list */
836        pthread_mutex_lock(&open_dag_mutex);
837
838        /* Detach the stream if we are not paused */
839        if (FORMAT_DATA->stream_attached)
840                dag_pause_input(libtrace);
841        FORMAT_DATA->device->ref_count--;
842
843        /* Close the DAG device if there are no more references to it */
844        if (FORMAT_DATA->device->ref_count == 0)
845                dag_close_device(FORMAT_DATA->device);
846
847        if (DUCK.dummy_duck)
848                trace_destroy_dead(DUCK.dummy_duck);
849
850        /* Clear the list */
851        libtrace_list_deinit(FORMAT_DATA->per_stream);
852        free(libtrace->format_data);
853        pthread_mutex_unlock(&open_dag_mutex);
854        return 0; /* success */
855}
856
857/* Closes a DAG output trace */
858static int dag_fin_output(libtrace_out_t *libtrace)
859{
860
861        /* Commit any outstanding traffic in the txbuffer */
862        if (FORMAT_DATA_OUT->waiting) {
863                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
864                                           FORMAT_DATA_OUT->dagstream,
865                                           FORMAT_DATA_OUT->waiting );
866        }
867
868        /* Wait until the buffer is nearly clear before exiting the program,
869         * as we will lose packets otherwise */
870        dag_tx_get_stream_space64
871                (FORMAT_DATA_OUT->device->fd,
872                 FORMAT_DATA_OUT->dagstream,
873                 dag_get_stream_buffer_size64(FORMAT_DATA_OUT->device->fd,
874                                            FORMAT_DATA_OUT->dagstream) - 8);
875
876        /* Need the lock, since we're going to be handling the device list */
877        pthread_mutex_lock(&open_dag_mutex);
878
879        /* Detach the stream if we are not paused */
880        if (FORMAT_DATA_OUT->stream_attached)
881                dag_pause_output(libtrace);
882        FORMAT_DATA_OUT->device->ref_count --;
883
884        /* Close the DAG device if there are no more references to it */
885        if (FORMAT_DATA_OUT->device->ref_count == 0)
886                dag_close_device(FORMAT_DATA_OUT->device);
887        free(libtrace->format_data);
888        pthread_mutex_unlock(&open_dag_mutex);
889        return 0; /* success */
890}
891
892#ifdef DAGIOC_CARD_DUCK
893#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
894#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
895#else
896#ifdef DAGIOCDUCK
897#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
898#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
899#else
900#warning "DAG appears to be missing DUCK support"
901#endif
902#endif
903
904/* Extracts DUCK information from the DAG card and produces a DUCK packet */
905static int dag_get_duckinfo(libtrace_t *libtrace,
906                                libtrace_packet_t *packet) {
907
908        if (DUCK.duck_freq == 0)
909                return 0;
910
911#ifndef LIBTRACE_DUCK_IOCTL
912        trace_set_err(libtrace, errno, 
913                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
914        DUCK.duck_freq = 0;
915        return -1;
916#endif
917
918        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
919                return 0;
920
921        /* Allocate memory for the DUCK data */
922        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
923            !packet->buffer) {
924                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
925                packet->buf_control = TRACE_CTRL_PACKET;
926                if (!packet->buffer) {
927                        trace_set_err(libtrace, errno,
928                                      "Cannot allocate packet buffer");
929                        return -1;
930                }
931        }
932
933        /* DUCK doesn't have a format header */
934        packet->header = 0;
935        packet->payload = packet->buffer;
936
937        /* No need to check if we can get DUCK or not - we're modern
938         * enough so just grab the DUCK info */
939        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
940                   (duckinf_t *)packet->payload) < 0)) {
941                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
942                DUCK.duck_freq = 0;
943                return -1;
944        }
945
946        packet->type = LIBTRACE_DUCK_VERSION;
947
948        /* Set the packet's trace to point at a DUCK trace, so that the
949         * DUCK format functions will be called on the packet rather than the
950         * DAG ones */
951        if (!DUCK.dummy_duck)
952                DUCK.dummy_duck = trace_create_dead("duck:dummy");
953        packet->trace = DUCK.dummy_duck;
954        DUCK.last_duck = DUCK.last_pkt;
955        packet->error = sizeof(duckinf_t);
956        return sizeof(duckinf_t);
957}
958
959/* Determines the amount of data available to read from the DAG card */
960static int dag_available(libtrace_t *libtrace,
961                         struct dag_per_stream_t *stream_data)
962{
963        uint32_t diff = stream_data->top - stream_data->bottom;
964
965        /* If we've processed more than 4MB of data since we last called
966         * dag_advance_stream, then we should call it again to allow the
967         * space occupied by that 4MB to be released */
968        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
969                return diff;
970
971        /* Update the top and bottom pointers */
972        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
973                                              stream_data->dagstream,
974                                              &(stream_data->bottom));
975
976        if (stream_data->top == NULL) {
977                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
978                return -1;
979        }
980        stream_data->processed = 0;
981        diff = stream_data->top - stream_data->bottom;
982        return diff;
983}
984
985/* Returns a pointer to the start of the next complete ERF record */
986static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
987{
988        dag_record_t *erfptr = NULL;
989        uint16_t size;
990
991        erfptr = (dag_record_t *)stream_data->bottom;
992        if (!erfptr)
993                return NULL;
994
995        size = ntohs(erfptr->rlen);
996        assert( size >= dag_record_size );
997
998        /* Make certain we have the full packet available */
999        if (size > (stream_data->top - stream_data->bottom))
1000                return NULL;
1001
1002        stream_data->bottom += size;
1003        stream_data->processed += size;
1004        return erfptr;
1005}
1006
1007/* Converts a buffer containing a recently read DAG packet record into a
1008 * libtrace packet */
1009static int dag_prepare_packet_stream(libtrace_t *libtrace,
1010                                     struct dag_per_stream_t *stream_data,
1011                                     libtrace_packet_t *packet,
1012                                     void *buffer, libtrace_rt_types_t rt_type,
1013                                     uint32_t flags)
1014{
1015        dag_record_t *erfptr;
1016
1017        /* If the packet previously owned a buffer that is not the buffer
1018         * that contains the new packet data, we're going to need to free the
1019         * old one to avoid memory leaks */
1020        if (packet->buffer != buffer &&
1021            packet->buf_control == TRACE_CTRL_PACKET) {
1022                free(packet->buffer);
1023        }
1024
1025        /* Set the buffer owner appropriately */
1026        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1027                packet->buf_control = TRACE_CTRL_PACKET;
1028        } else
1029                packet->buf_control = TRACE_CTRL_EXTERNAL;
1030
1031        /* Update the packet pointers and type appropriately */
1032        erfptr = (dag_record_t *)buffer;
1033        packet->buffer = erfptr;
1034        packet->header = erfptr;
1035        packet->type = rt_type;
1036
1037        if (erfptr->flags.rxerror == 1) {
1038                /* rxerror means the payload is corrupt - drop the payload
1039                 * by tweaking rlen */
1040                packet->payload = NULL;
1041                erfptr->rlen = htons(erf_get_framing_length(packet));
1042        } else {
1043                packet->payload = (char*)packet->buffer
1044                        + erf_get_framing_length(packet);
1045        }
1046
1047        if (libtrace->format_data == NULL) {
1048                dag_init_format_data(libtrace);
1049        }
1050
1051        /* Update the dropped packets counter */
1052        /* No loss counter for DSM coloured records - have to use some
1053         * other API */
1054        if (erf_is_color_type(erfptr->type)) {
1055                /* TODO */
1056        } else {
1057                /* Use the ERF loss counter */
1058                if (stream_data->seeninterface[erfptr->flags.iface]
1059                    == 0) {
1060                        stream_data->seeninterface[erfptr->flags.iface]
1061                                = 1;
1062                } else {
1063                        stream_data->drops += ntohs(erfptr->lctr);
1064                }
1065        }
1066
1067        return 0;
1068}
1069
1070static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1071                              void *buffer, libtrace_rt_types_t rt_type,
1072                              uint32_t flags)
1073{
1074        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1075                                       buffer, rt_type, flags);
1076}
1077
1078/*
1079 * dag_write_packet() at this stage attempts to improve tx performance
1080 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1081 * has been met. I observed approximately 270% performance increase
1082 * through this relatively naive tweak. No optimisation of buffer sizes
1083 * was attempted.
1084 */
1085
1086/* Pushes an ERF record onto the transmit stream */
1087static int dag_dump_packet(libtrace_out_t *libtrace,
1088                           dag_record_t *erfptr, unsigned int pad,
1089                           void *buffer)
1090{
1091        int size;
1092
1093        /*
1094         * If we've got 0 bytes waiting in the txqueue, assume that we
1095         * haven't requested any space yet, and request some, storing
1096         * the pointer at FORMAT_DATA_OUT->txbuffer.
1097         *
1098         * The amount to request is slightly magical at the moment - it's
1099         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1100         * the buffer and handle overruns.
1101         */
1102        if (FORMAT_DATA_OUT->waiting == 0) {
1103                FORMAT_DATA_OUT->txbuffer =
1104                        dag_tx_get_stream_space64(FORMAT_DATA_OUT->device->fd,
1105                                                FORMAT_DATA_OUT->dagstream,
1106                                                16908288);
1107        }
1108
1109        /*
1110         * Copy the header separately to the body, as we can't guarantee they
1111         * are in contiguous memory
1112         */
1113        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1114               (dag_record_size + pad));
1115        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1116
1117        /*
1118         * Copy our incoming packet into the outgoing buffer, and increment
1119         * our waiting count
1120         */
1121        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1122        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1123               size);
1124        FORMAT_DATA_OUT->waiting += size;
1125
1126        /*
1127         * If our output buffer has more than 16 Mebibytes in it, commit those
1128         * bytes and reset the waiting count to 0.
1129         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1130         * case there is still data in the buffer at program exit.
1131         */
1132        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1133                FORMAT_DATA_OUT->txbuffer =
1134                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1135                                                   FORMAT_DATA_OUT->dagstream,
1136                                                   FORMAT_DATA_OUT->waiting);
1137                FORMAT_DATA_OUT->waiting = 0;
1138        }
1139
1140        return size + pad + dag_record_size;
1141}
1142
1143/* Attempts to determine a suitable ERF type for a given packet. Returns true
1144 * if one is found, false otherwise */
1145static bool find_compatible_linktype(libtrace_out_t *libtrace,
1146                                     libtrace_packet_t *packet, char *type)
1147{
1148        /* Keep trying to simplify the packet until we can find
1149         * something we can do with it */
1150
1151        do {
1152                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1153
1154                /* Success */
1155                if (*type != (char)-1)
1156                        return true;
1157
1158                if (!demote_packet(packet)) {
1159                        trace_set_err_out(libtrace,
1160                                          TRACE_ERR_NO_CONVERSION,
1161                                          "No erf type for packet (%i)",
1162                                          trace_get_link_type(packet));
1163                        return false;
1164                }
1165
1166        } while(1);
1167
1168        return true;
1169}
1170
1171/* Writes a packet to the provided DAG output trace */
1172static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1173{
1174        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1175         * coding sucks, sorry about that.
1176         */
1177        unsigned int pad = 0;
1178        int numbytes;
1179        void *payload = packet->payload;
1180        dag_record_t *header = (dag_record_t *)packet->header;
1181        char erf_type = 0;
1182
1183        if(!packet->header) {
1184                /* No header, probably an RT packet. Lifted from
1185                 * erf_write_packet(). */
1186                return -1;
1187        }
1188
1189        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1190                return 0;
1191
1192        pad = dag_get_padding(packet);
1193
1194        /*
1195         * If the payload is null, adjust the rlen. Discussion of this is
1196         * attached to erf_write_packet()
1197         */
1198        if (payload == NULL) {
1199                header->rlen = htons(dag_record_size + pad);
1200        }
1201
1202        if (packet->type == TRACE_RT_DATA_ERF) {
1203                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1204        } else {
1205                /* Build up a new packet header from the existing header */
1206
1207                /* Simplify the packet first - if we can't do this, break
1208                 * early */
1209                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1210                        return -1;
1211
1212                dag_record_t erfhdr;
1213
1214                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1215                payload=packet->payload;
1216                pad = dag_get_padding(packet);
1217
1218                /* Flags. Can't do this */
1219                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1220                if (trace_get_direction(packet)!=(int)~0U)
1221                        erfhdr.flags.iface = trace_get_direction(packet);
1222
1223                erfhdr.type = erf_type;
1224
1225                /* Packet length (rlen includes format overhead) */
1226                assert(trace_get_capture_length(packet) > 0
1227                       && trace_get_capture_length(packet) <= 65536);
1228                assert(erf_get_framing_length(packet) > 0
1229                       && trace_get_framing_length(packet) <= 65536);
1230                assert(trace_get_capture_length(packet) +
1231                       erf_get_framing_length(packet) > 0
1232                       && trace_get_capture_length(packet) +
1233                       erf_get_framing_length(packet) <= 65536);
1234
1235                erfhdr.rlen = htons(trace_get_capture_length(packet)
1236                                    + erf_get_framing_length(packet));
1237
1238
1239                /* Loss counter. Can't do this */
1240                erfhdr.lctr = 0;
1241                /* Wire length, does not include padding! */
1242                erfhdr.wlen = htons(trace_get_wire_length(packet));
1243
1244                /* Write it out */
1245                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1246        }
1247
1248        return numbytes;
1249}
1250
1251/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1252 *
1253 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1254 */
1255static int dag_read_packet_stream(libtrace_t *libtrace,
1256                                struct dag_per_stream_t *stream_data,
1257                                libtrace_thread_t *t, /* Optional */
1258                                libtrace_packet_t *packet)
1259{
1260        int size = 0;
1261        dag_record_t *erfptr = NULL;
1262        struct timeval tv;
1263        int numbytes = 0;
1264        uint32_t flags = 0;
1265        struct timeval maxwait, pollwait;
1266
1267        pollwait.tv_sec = 0;
1268        pollwait.tv_usec = 10000;
1269        maxwait.tv_sec = 0;
1270        maxwait.tv_usec = 250000;
1271
1272        /* Check if we're due for a DUCK report - only report on the first thread */
1273        if (stream_data == FORMAT_DATA_FIRST) {
1274                size = dag_get_duckinfo(libtrace, packet);
1275                if (size != 0)
1276                        return size;
1277        }
1278
1279
1280        /* Don't let anyone try to free our DAG memory hole! */
1281        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1282
1283        /* If the packet buffer is currently owned by libtrace, free it so
1284         * that we can set the packet to point into the DAG memory hole */
1285        if (packet->buf_control == TRACE_CTRL_PACKET) {
1286                free(packet->buffer);
1287                packet->buffer = 0;
1288        }
1289
1290        if (dag_set_stream_poll64(FORMAT_DATA->device->fd, stream_data->dagstream,
1291                                sizeof(dag_record_t), &maxwait,
1292                                &pollwait) == -1) {
1293                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1294                return -1;
1295        }
1296
1297        /* Grab a full ERF record */
1298        do {
1299                numbytes = dag_available(libtrace, stream_data);
1300                if (numbytes < 0)
1301                        return numbytes;
1302                if (numbytes < dag_record_size) {
1303                        /* Check the message queue if we have one to check */
1304                        if (t != NULL &&
1305                            libtrace_message_queue_count(&t->messages) > 0)
1306                                return -2;
1307
1308                        if ((numbytes=is_halted(libtrace)) != -1)
1309                                return numbytes;
1310                        /* Block until we see a packet */
1311                        continue;
1312                }
1313                erfptr = dag_get_record(stream_data);
1314        } while (erfptr == NULL);
1315
1316        packet->trace = libtrace;
1317
1318        /* Prepare the libtrace packet */
1319        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1320                                    TRACE_RT_DATA_ERF, flags))
1321                return -1;
1322
1323        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1324        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1325                tv = trace_get_timeval(packet);
1326                DUCK.last_pkt = tv.tv_sec;
1327        }
1328
1329        packet->order = erf_get_erf_timestamp(packet);
1330        packet->error = packet->payload ? htons(erfptr->rlen) :
1331                                          erf_get_framing_length(packet);
1332
1333        return packet->error;
1334}
1335
1336static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1337{
1338        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1339}
1340
1341static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1342                             libtrace_packet_t **packets, size_t nb_packets)
1343{
1344        int ret;
1345        size_t read_packets = 0;
1346        int numbytes = 0;
1347
1348        struct dag_per_stream_t *stream_data =
1349                (struct dag_per_stream_t *)t->format_data;
1350
1351        /* Read as many packets as we can, but read atleast one packet */
1352        do {
1353                ret = dag_read_packet_stream(libtrace, stream_data, t,
1354                                           packets[read_packets]);
1355                if (ret < 0)
1356                        return ret;
1357
1358                read_packets++;
1359
1360                /* Make sure we don't read too many packets..! */
1361                if (read_packets >= nb_packets)
1362                        break;
1363
1364                numbytes = dag_available(libtrace, stream_data);
1365        } while (numbytes >= dag_record_size);
1366
1367        return read_packets;
1368}
1369
1370/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1371 * packet is available, we will return a packet event. Otherwise we will
1372 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1373 */
1374static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1375                                           libtrace_packet_t *packet)
1376{
1377        libtrace_eventobj_t event = {0,0,0.0,0};
1378        dag_record_t *erfptr = NULL;
1379        int numbytes;
1380        uint32_t flags = 0;
1381        struct timeval minwait, tv;
1382       
1383        minwait.tv_sec = 0;
1384        minwait.tv_usec = 10000;
1385
1386        /* Check if we're meant to provide a DUCK update */
1387        numbytes = dag_get_duckinfo(libtrace, packet);
1388        if (numbytes < 0) {
1389                event.type = TRACE_EVENT_TERMINATE;
1390                return event;
1391        } else if (numbytes > 0) {
1392                event.type = TRACE_EVENT_PACKET;
1393                return event;
1394        }
1395       
1396        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
1397                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1398                                &minwait) == -1) {
1399                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1400                event.type = TRACE_EVENT_TERMINATE;
1401                return event;
1402        }
1403
1404        do {
1405                erfptr = NULL;
1406                numbytes = 0;
1407
1408                /* Need to call dag_available so that the top pointer will get
1409                 * updated, otherwise we'll never see any data! */
1410                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1411
1412                /* May as well not bother calling dag_get_record if
1413                 * dag_available suggests that there's no data */
1414                if (numbytes != 0)
1415                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1416                if (erfptr == NULL) {
1417                        /* No packet available - sleep for a very short time */
1418                        if (libtrace_halt) {
1419                                event.type = TRACE_EVENT_TERMINATE;
1420                        } else {
1421                                event.type = TRACE_EVENT_SLEEP;
1422                                event.seconds = 0.0001;
1423                        }
1424                        break;
1425                }
1426                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1427                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1428                        event.type = TRACE_EVENT_TERMINATE;
1429                        break;
1430                }
1431
1432
1433                event.size = trace_get_capture_length(packet) +
1434                        trace_get_framing_length(packet);
1435
1436                /* XXX trace_read_packet() normally applies the following
1437                 * config options for us, but this function is called via
1438                 * trace_event() so we have to do it ourselves */
1439
1440                if (libtrace->filter) {
1441                        int filtret = trace_apply_filter(libtrace->filter,
1442                                                         packet);
1443                        if (filtret == -1) {
1444                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1445                                              "Bad BPF Filter");
1446                                event.type = TRACE_EVENT_TERMINATE;
1447                                break;
1448                        }
1449
1450                        if (filtret == 0) {
1451                                /* This packet isn't useful so we want to
1452                                 * immediately see if there is another suitable
1453                                 * one - we definitely DO NOT want to return
1454                                 * a sleep event in this case, like we used to
1455                                 * do! */
1456                                libtrace->filtered_packets ++;
1457                                trace_clear_cache(packet);
1458                                continue;
1459                        }
1460
1461                        event.type = TRACE_EVENT_PACKET;
1462                } else {
1463                        event.type = TRACE_EVENT_PACKET;
1464                }
1465
1466                /* Update the DUCK timer */
1467                tv = trace_get_timeval(packet);
1468                DUCK.last_pkt = tv.tv_sec;
1469               
1470                if (libtrace->snaplen > 0) {
1471                        trace_set_capture_length(packet, libtrace->snaplen);
1472                }
1473                libtrace->accepted_packets ++;
1474                break;
1475        } while(1);
1476
1477        return event;
1478}
1479
1480static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1481{
1482        libtrace_list_node_t *tmp;
1483        assert(stat && libtrace);
1484        tmp = FORMAT_DATA_HEAD;
1485
1486        /* Dropped packets */
1487        stat->dropped_valid = 1;
1488        stat->dropped = 0;
1489        while (tmp != NULL) {
1490                stat->dropped += STREAM_DATA(tmp)->drops;
1491                tmp = tmp->next;
1492        }
1493
1494}
1495
1496static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1497                                       libtrace_stat_t *stat) {
1498        struct dag_per_stream_t *stream_data = t->format_data;
1499        assert(stat && libtrace);
1500
1501        stat->dropped_valid = 1;
1502        stat->dropped = stream_data->drops;
1503
1504        stat->filtered_valid = 1;
1505        stat->filtered = 0;
1506}
1507
1508/* Prints some semi-useful help text about the DAG format module */
1509static void dag_help(void) {
1510        printf("dag format module: $Revision: 1755 $\n");
1511        printf("Supported input URIs:\n");
1512        printf("\tdag:/dev/dagn\n");
1513        printf("\n");
1514        printf("\te.g.: dag:/dev/dag0\n");
1515        printf("\n");
1516        printf("Supported output URIs:\n");
1517        printf("\tnone\n");
1518        printf("\n");
1519}
1520
1521static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1522                                bool reader)
1523{
1524        struct dag_per_stream_t *stream_data;
1525        libtrace_list_node_t *node;
1526
1527        if (reader && t->type == THREAD_PERPKT) {
1528                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1529                                                t->perpkt_num);
1530                if (node == NULL) {
1531                        return -1;
1532                }
1533                stream_data = node->data;
1534
1535                /* Pass the per thread data to the thread */
1536                t->format_data = stream_data;
1537
1538                /* Attach and start the DAG stream */
1539                if (dag_start_input_stream(libtrace, stream_data) < 0)
1540                        return -1;
1541        }
1542
1543        return 0;
1544}
1545
1546static struct libtrace_format_t dag = {
1547        "dag",
1548        "$Id$",
1549        TRACE_FORMAT_ERF,
1550        dag_probe_filename,             /* probe filename */
1551        NULL,                           /* probe magic */
1552        dag_init_input,                 /* init_input */
1553        dag_config_input,               /* config_input */
1554        dag_start_input,                /* start_input */
1555        dag_pause_input,                /* pause_input */
1556        dag_init_output,                /* init_output */
1557        NULL,                           /* config_output */
1558        dag_start_output,               /* start_output */
1559        dag_fin_input,                  /* fin_input */
1560        dag_fin_output,                 /* fin_output */
1561        dag_read_packet,                /* read_packet */
1562        dag_prepare_packet,             /* prepare_packet */
1563        NULL,                           /* fin_packet */
1564        dag_write_packet,               /* write_packet */
1565        erf_get_link_type,              /* get_link_type */
1566        erf_get_direction,              /* get_direction */
1567        erf_set_direction,              /* set_direction */
1568        erf_get_erf_timestamp,          /* get_erf_timestamp */
1569        NULL,                           /* get_timeval */
1570        NULL,                           /* get_seconds */
1571        NULL,                           /* get_timespec */
1572        NULL,                           /* seek_erf */
1573        NULL,                           /* seek_timeval */
1574        NULL,                           /* seek_seconds */
1575        erf_get_capture_length,         /* get_capture_length */
1576        erf_get_wire_length,            /* get_wire_length */
1577        erf_get_framing_length,         /* get_framing_length */
1578        erf_set_capture_length,         /* set_capture_length */
1579        NULL,                           /* get_received_packets */
1580        NULL,                           /* get_filtered_packets */
1581        NULL,                           /* get_dropped_packets */
1582        dag_get_statistics,             /* get_statistics */
1583        NULL,                           /* get_fd */
1584        trace_event_dag,                /* trace_event */
1585        dag_help,                       /* help */
1586        NULL,                            /* next pointer */
1587        {true, 0}, /* live packet capture, thread limit TBD */
1588        dag_pstart_input,
1589        dag_pread_packets,
1590        dag_pause_input,
1591        NULL,
1592        dag_pregister_thread,
1593        NULL,
1594        dag_get_thread_statistics       /* get thread stats */
1595};
1596
1597void dag_constructor(void)
1598{
1599        register_format(&dag);
1600}
Note: See TracBrowser for help on using the repository browser.