source: lib/format_dag25.c @ 8c5c550

develop
Last change on this file since 8c5c550 was 8c5c550, checked in by Shane Alcock <salcock@…>, 23 months ago

Fix build errors introduced recently in format_dag25.c

  • Property mode set to 100644
File size: 46.8 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#define _GNU_SOURCE
27
28#include "config.h"
29#include "common.h"
30#include "libtrace.h"
31#include "libtrace_int.h"
32#include "format_helper.h"
33#include "format_erf.h"
34
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <string.h>
39#include <stdlib.h>
40#include <sys/stat.h>
41
42#include <sys/mman.h>
43/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
44 * Windows anyway so we'll worry about this more later :] */
45#include <pthread.h>
46
47
48#ifdef HAVE_DAG_CONFIG_API_H
49#include <dag_config_api.h>
50#endif
51
52#ifdef WIN32
53#  include <io.h>
54#  include <share.h>
55#  define PATH_MAX _MAX_PATH
56#  define snprintf sprintf_s
57#else
58#  include <netdb.h>
59#  ifndef PATH_MAX
60#       define PATH_MAX 4096
61#  endif
62#  include <sys/ioctl.h>
63#endif
64
65/* This format deals with DAG cards that are using drivers from the 2.5 version
66 * onwards, including 3.X.
67 *
68 * DAG is a LIVE capture format.
69 *
70 * This format does support writing, provided the DAG card that you are using
71 * has transmit (Tx) support. Additionally, packets read using this format
72 * are in the ERF format, so can easily be written as ERF traces without
73 * losing any data.
74 */
75
76#define DATA(x) ((struct dag_format_data_t *)x->format_data)
77#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
78#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
79
80#define FORMAT_DATA DATA(libtrace)
81#define FORMAT_DATA_OUT DATA_OUT(libtrace)
82
83#define DUCK FORMAT_DATA->duck
84
85#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
86#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
87
88static struct libtrace_format_t dag;
89
90/* A DAG device - a DAG device can support multiple streams (and therefore
91 * multiple input traces) so each trace needs to refer to a device */
92struct dag_dev_t {
93        char * dev_name;                /* Device name */
94        int fd;                         /* File descriptor */
95        uint16_t ref_count;             /* Number of input / output traces
96                                           that are using this device */
97        struct dag_dev_t *prev;         /* Pointer to the previous device in
98                                           the device list */
99        struct dag_dev_t *next;         /* Pointer to the next device in the
100                                           device list */
101};
102
103/* "Global" data that is stored for each DAG output trace */
104struct dag_format_data_out_t {
105        /* The DAG device being used for writing */
106        struct dag_dev_t *device;
107        /* The DAG stream that is being written on */
108        unsigned int dagstream;
109        /* Boolean flag indicating whether the stream is currently attached */
110        int stream_attached;
111        /* The amount of data waiting to be transmitted, in bytes */
112        uint64_t waiting;
113        /* A buffer to hold the data to be transmittted */
114        uint8_t *txbuffer;
115};
116
117/* Data that is stored against each input stream */
118struct dag_per_stream_t {
119        /* DAG stream number */
120        uint16_t dagstream;
121        /* Pointer to the last unread byte in the DAG memory */
122        uint8_t *top;
123        /* Pointer to the first unread byte in the DAG memory */
124        uint8_t *bottom;
125        /* Amount of data processed from the bottom pointer */
126        uint32_t processed;
127        /* Number of packets seen by the stream */
128        uint64_t pkt_count;
129        /* Drop count for this particular stream */
130        uint64_t drops;
131        /* Boolean values to indicate if a particular interface has been seen
132         * or not. This is limited to four interfaces, which is enough to
133         * support all current DAG cards */
134        uint8_t seeninterface[4];
135};
136
137/* "Global" data that is stored for each DAG input trace */
138struct dag_format_data_t {
139        /* DAG device */
140        struct dag_dev_t *device;
141        /* Boolean flag indicating whether the trace is currently attached */
142        int stream_attached;
143        /* Data stored against each DAG input stream */
144        libtrace_list_t *per_stream;
145
146        /* Data required for regular DUCK reporting.
147         * We put this on a new cache line otherwise we have a lot of false
148         * sharing caused by updating the last_pkt.
149         * This should only ever be accessed by the first thread stream,
150         * that includes both read and write operations.
151         */
152        struct {
153                /* Timestamp of the last DUCK report */
154                uint32_t last_duck;
155                /* The number of seconds between each DUCK report */
156                uint32_t duck_freq;
157                /* Timestamp of the last packet read from the DAG card */
158                uint32_t last_pkt;
159                /* Dummy trace to ensure DUCK packets are dealt with using the
160                 * DUCK format functions */
161                libtrace_t *dummy_duck;
162        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
163};
164
165/* To be thread-safe, we're going to need a mutex for operating on the list
166 * of DAG devices */
167pthread_mutex_t open_dag_mutex;
168
169/* The list of DAG devices that have been opened by libtrace.
170 *
171 * We can only open each DAG device once, but we might want to read from
172 * multiple streams. Therefore, we need to maintain a list of devices that we
173 * have opened (with ref counts!) so that we don't try to open a device too
174 * many times or close a device that we're still using */
175struct dag_dev_t *open_dags = NULL;
176
177/* Returns the amount of padding between the ERF header and the start of the
178 * captured packet data */
179static int dag_get_padding(const libtrace_packet_t *packet)
180{
181        /* ERF Ethernet records have a 2 byte padding before the packet itself
182         * so that the IP header is aligned on a 32 bit boundary.
183         */
184        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
185                dag_record_t *erfptr = (dag_record_t *)packet->header;
186                switch(erfptr->type) {
187                        case TYPE_ETH:
188                        case TYPE_COLOR_ETH:
189                        case TYPE_DSM_COLOR_ETH:
190                        case TYPE_COLOR_HASH_ETH:
191                                return 2;
192                        default:                return 0;
193                }
194        }
195        else {
196                switch(trace_get_link_type(packet)) {
197                        case TRACE_TYPE_ETH:    return 2;
198                        default:                return 0;
199                }
200        }
201}
202
203/* Attempts to determine if the given filename refers to a DAG device */
204static int dag_probe_filename(const char *filename)
205{
206        struct stat statbuf;
207        /* Can we stat the file? */
208        if (stat(filename, &statbuf) != 0) {
209                return 0;
210        }
211        /* Is it a character device? */
212        if (!S_ISCHR(statbuf.st_mode)) {
213                return 0;
214        }
215        /* Yeah, it's probably us. */
216        return 1;
217}
218
219/* Initialises the DAG output data structure */
220static void dag_init_format_out_data(libtrace_out_t *libtrace)
221{
222        libtrace->format_data = (struct dag_format_data_out_t *)
223                malloc(sizeof(struct dag_format_data_out_t));
224        // no DUCK on output
225        FORMAT_DATA_OUT->stream_attached = 0;
226        FORMAT_DATA_OUT->device = NULL;
227        FORMAT_DATA_OUT->dagstream = 0;
228        FORMAT_DATA_OUT->waiting = 0;
229
230}
231
232/* Initialises the DAG input data structure */
233static void dag_init_format_data(libtrace_t *libtrace)
234{
235        struct dag_per_stream_t stream_data;
236
237        libtrace->format_data = (struct dag_format_data_t *)
238                malloc(sizeof(struct dag_format_data_t));
239        DUCK.last_duck = 0;
240        DUCK.duck_freq = 0;
241        DUCK.last_pkt = 0;
242        DUCK.dummy_duck = NULL;
243
244        FORMAT_DATA->per_stream =
245                libtrace_list_init(sizeof(stream_data));
246        if (FORMAT_DATA->per_stream == NULL) {
247                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
248                        "Unable to create list for stream in dag_init_format_data()");
249                return;
250        }
251
252        /* We'll start with just one instance of stream_data, and we'll
253         * add more later if we need them */
254        memset(&stream_data, 0, sizeof(stream_data));
255        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
256}
257
258/* Determines if there is already an entry for the given DAG device in the
259 * device list and increments the reference count for that device, if found.
260 *
261 * NOTE: This function assumes the open_dag_mutex is held by the caller */
262static struct dag_dev_t *dag_find_open_device(char *dev_name)
263{
264        struct dag_dev_t *dag_dev;
265
266        dag_dev = open_dags;
267
268        /* XXX: Not exactly zippy, but how often are we going to be dealing
269         * with multiple dag cards? */
270        while (dag_dev != NULL) {
271                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
272                        dag_dev->ref_count ++;
273                        return dag_dev;
274                }
275                dag_dev = dag_dev->next;
276        }
277        return NULL;
278}
279
280/* Closes a DAG device and removes it from the device list.
281 *
282 * Attempting to close a DAG device that has a non-zero reference count will
283 * cause an assertion failure!
284 *
285 * NOTE: This function assumes the open_dag_mutex is held by the caller */
286static void dag_close_device(struct dag_dev_t *dev)
287{
288        /* Need to remove from the device list */
289        if (dev->ref_count != 0) {
290                fprintf(stderr, "Cannot close DAG device with non zero reference in dag_close_device()\n");
291                return;
292        }
293
294        if (dev->prev == NULL) {
295                open_dags = dev->next;
296                if (dev->next)
297                        dev->next->prev = NULL;
298        } else {
299                dev->prev->next = dev->next;
300                if (dev->next)
301                        dev->next->prev = dev->prev;
302        }
303
304        dag_close(dev->fd);
305        free(dev);
306}
307
308
309/* Opens a new DAG device for writing and adds it to the DAG device list
310 *
311 * NOTE: this function should only be called when opening a DAG device for
312 * writing - there is little practical difference between this and the
313 * function below that covers the reading case, but we need the output trace
314 * object to report errors properly so the two functions take slightly
315 * different arguments. This is really lame and there should be a much better
316 * way of doing this.
317 *
318 * NOTE: This function assumes the open_dag_mutex is held by the caller
319 */
320static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
321                                                char *dev_name)
322{
323        struct stat buf;
324        int fd;
325        struct dag_dev_t *new_dev;
326
327        /* Make sure the device exists */
328        if (stat(dev_name, &buf) == -1) {
329                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
330                return NULL;
331        }
332
333        /* Make sure it is the appropriate type of device */
334        if (S_ISCHR(buf.st_mode)) {
335                /* Try opening the DAG device */
336                if((fd = dag_open(dev_name)) < 0) {
337                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
338                                        dev_name);
339                        return NULL;
340                }
341        } else {
342                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
343                                dev_name);
344                return NULL;
345        }
346
347        /* Add the device to our device list - it is just a doubly linked
348         * list with no inherent ordering; just tack the new one on the front
349         */
350        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
351        new_dev->fd = fd;
352        new_dev->dev_name = dev_name;
353        new_dev->ref_count = 1;
354
355        new_dev->prev = NULL;
356        new_dev->next = open_dags;
357        if (open_dags)
358                open_dags->prev = new_dev;
359
360        open_dags = new_dev;
361
362        return new_dev;
363}
364
365/* Opens a new DAG device for reading and adds it to the DAG device list
366 *
367 * NOTE: this function should only be called when opening a DAG device for
368 * reading - there is little practical difference between this and the
369 * function above that covers the writing case, but we need the input trace
370 * object to report errors properly so the two functions take slightly
371 * different arguments. This is really lame and there should be a much better
372 * way of doing this.
373 *
374 * NOTE: This function assumes the open_dag_mutex is held by the caller */
375static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
376        struct stat buf;
377        int fd;
378        struct dag_dev_t *new_dev;
379
380        /* Make sure the device exists */
381        if (stat(dev_name, &buf) == -1) {
382                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
383                return NULL;
384        }
385
386        /* Make sure it is the appropriate type of device */
387        if (S_ISCHR(buf.st_mode)) {
388                /* Try opening the DAG device */
389                if((fd = dag_open(dev_name)) < 0) {
390                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
391                                      dev_name);
392                        return NULL;
393                }
394        } else {
395                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
396                              dev_name);
397                return NULL;
398        }
399
400        /* Add the device to our device list - it is just a doubly linked
401         * list with no inherent ordering; just tack the new one on the front
402         */
403        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
404        new_dev->fd = fd;
405        new_dev->dev_name = dev_name;
406        new_dev->ref_count = 1;
407
408        new_dev->prev = NULL;
409        new_dev->next = open_dags;
410        if (open_dags)
411                open_dags->prev = new_dev;
412
413        open_dags = new_dev;
414
415        return new_dev;
416}
417
418/* Creates and initialises a DAG output trace */
419static int dag_init_output(libtrace_out_t *libtrace)
420{
421        /* Upon successful creation, the device name is stored against the
422         * device and free when it is free()d */
423        char *dag_dev_name = NULL;
424        char *scan = NULL;
425        struct dag_dev_t *dag_device = NULL;
426        int stream = 1;
427
428        /* XXX I don't know if this is important or not, but this function
429         * isn't present in all of the driver releases that this code is
430         * supposed to support! */
431        /*
432        unsigned long wake_time;
433        dagutil_sleep_get_wake_time(&wake_time,0);
434        */
435
436        dag_init_format_out_data(libtrace);
437        /* Grab the mutex while we're likely to be messing with the device
438         * list */
439        pthread_mutex_lock(&open_dag_mutex);
440
441        /* Specific streams are signified using a comma in the libtrace URI,
442         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
443         *
444         * If no stream is specified, we will write using stream 1 */
445        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
446                dag_dev_name = strdup(libtrace->uridata);
447        } else {
448                dag_dev_name = (char *)strndup(libtrace->uridata,
449                                (size_t)(scan - libtrace->uridata));
450                stream = atoi(++scan);
451        }
452        FORMAT_DATA_OUT->dagstream = stream;
453
454        /* See if our DAG device is already open */
455        dag_device = dag_find_open_device(dag_dev_name);
456
457        if (dag_device == NULL) {
458                /* Device not yet opened - open it ourselves */
459                dag_device = dag_open_output_device(libtrace, dag_dev_name);
460        } else {
461                /* Otherwise, just use the existing one */
462                free(dag_dev_name);
463                dag_dev_name = NULL;
464        }
465
466        /* Make sure we have successfully opened a DAG device */
467        if (dag_device == NULL) {
468                if (dag_dev_name) {
469                        free(dag_dev_name);
470                }
471                pthread_mutex_unlock(&open_dag_mutex);
472                return -1;
473        }
474
475        FORMAT_DATA_OUT->device = dag_device;
476        pthread_mutex_unlock(&open_dag_mutex);
477        return 0;
478}
479
480/* Creates and initialises a DAG input trace */
481static int dag_init_input(libtrace_t *libtrace) {
482        /* Upon successful creation, the device name is stored against the
483         * device and free when it is free()d */
484        char *dag_dev_name = NULL;
485        char *scan = NULL;
486        int stream = 0;
487        struct dag_dev_t *dag_device = NULL;
488
489        dag_init_format_data(libtrace);
490        /* Grab the mutex while we're likely to be messing with the device
491         * list */
492        pthread_mutex_lock(&open_dag_mutex);
493
494
495        /* DAG cards support multiple streams. In a single threaded capture,
496         * these are specified using a comma in the libtrace URI,
497         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
498         *
499         * If no stream is specified, we will read from stream 0 with
500         * one thread
501         */
502        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
503                dag_dev_name = strdup(libtrace->uridata);
504        } else {
505                dag_dev_name = (char *)strndup(libtrace->uridata,
506                                (size_t)(scan - libtrace->uridata));
507                stream = atoi(++scan);
508        }
509
510        FORMAT_DATA_FIRST->dagstream = stream;
511
512        /* See if our DAG device is already open */
513        dag_device = dag_find_open_device(dag_dev_name);
514
515        if (dag_device == NULL) {
516                /* Device not yet opened - open it ourselves */
517                dag_device = dag_open_device(libtrace, dag_dev_name);
518        } else {
519                /* Otherwise, just use the existing one */
520                free(dag_dev_name);
521                dag_dev_name = NULL;
522        }
523
524        /* Make sure we have successfully opened a DAG device */
525        if (dag_device == NULL) {
526                if (dag_dev_name)
527                        free(dag_dev_name);
528                dag_dev_name = NULL;
529                pthread_mutex_unlock(&open_dag_mutex);
530                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to open DAG device %s", dag_dev_name);
531                return -1;
532        }
533
534        FORMAT_DATA->device = dag_device;
535
536        /* See Config_Status_API_Programming_Guide.pdf from the Endace
537           Dag Documentation */
538        /* Check kBooleanAttributeActive is true -- no point capturing
539         * on an interface that's disabled
540         *
541         * The symptom of the port being disabled is that libtrace
542         * will appear to hang. */
543        /* Check kBooleanAttributeFault is false */
544        /* Check kBooleanAttributeLocalFault is false */
545        /* Check kBooleanAttributeLock is true ? */
546        /* Check kBooleanAttributePeerLink ? */
547
548        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
549           on libtrace promisc attribute?*/
550        /* Set kUint32AttributeSnapLength to the snaplength */
551
552        pthread_mutex_unlock(&open_dag_mutex);
553        return 0;
554}
555
556#ifdef HAVE_DAG_CONFIG_API_H
557static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
558        dag_card_ref_t card_ref = NULL;
559        dag_component_t root = NULL;
560        attr_uuid_t uuid = 0;
561
562        if (slen < 0)
563                slen = 0; 
564
565        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
566        root = dag_config_get_root_component(card_ref);
567       
568        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
569        dag_config_set_boolean_attribute(card_ref, uuid, true);
570
571        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
572        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
573
574        return 0;
575       
576
577} 
578#endif /* HAVE_DAG_CONFIG_API_H */
579
580/* Configures a DAG input trace */
581static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
582                            void *data)
583{
584        switch(option) {
585        case TRACE_OPTION_META_FREQ:
586                /* This option is used to specify the frequency of DUCK
587                 * updates */
588                DUCK.duck_freq = *(int *)data;
589                return 0;
590        case TRACE_OPTION_SNAPLEN:
591#ifdef HAVE_DAG_CONFIG_API_H
592                return dag_csapi_set_snaplen(libtrace, *(int *)data);
593#else
594                /* Tell the card our new snap length */
595        {
596                char conf_str[4096];
597                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
598                if (dag_configure(FORMAT_DATA->device->fd,
599                                  conf_str) != 0) {
600                        trace_set_err(libtrace, errno, "Failed to configure "
601                                      "snaplen on DAG card: %s",
602                                      libtrace->uridata);
603                        return -1;
604                }
605        }
606#endif /* HAVE_DAG_CONFIG_API_H */
607
608                return 0;
609        case TRACE_OPTION_PROMISC:
610                /* DAG already operates in a promisc fashion */
611                return -1;
612        case TRACE_OPTION_FILTER:
613                /* We don't yet support pushing filters into DAG
614                 * cards */
615                return -1;
616        case TRACE_OPTION_EVENT_REALTIME:
617        case TRACE_OPTION_REPLAY_SPEEDUP:
618                /* Live capture is always going to be realtime */
619                return -1;
620        case TRACE_OPTION_HASHER:
621                /* Lets just say we did this, it's currently still up to
622                 * the user to configure this correctly. */
623                return 0;
624        case TRACE_OPTION_CONSTANT_ERF_FRAMING:
625                return -1;
626        }
627        return -1;
628}
629
630/* Starts a DAG output trace */
631static int dag_start_output(libtrace_out_t *libtrace)
632{
633        struct timeval zero, nopoll;
634
635        zero.tv_sec = 0;
636        zero.tv_usec = 0;
637        nopoll = zero;
638
639        /* Attach and start the DAG stream */
640        if (dag_attach_stream64(FORMAT_DATA_OUT->device->fd,
641                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
642                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
643                return -1;
644        }
645
646        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
647                        FORMAT_DATA_OUT->dagstream) < 0) {
648                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
649                return -1;
650        }
651        FORMAT_DATA_OUT->stream_attached = 1;
652
653        /* We don't want the dag card to do any sleeping */
654        dag_set_stream_poll64(FORMAT_DATA_OUT->device->fd,
655                        FORMAT_DATA_OUT->dagstream, 0, &zero,
656                        &nopoll);
657
658        return 0;
659}
660
661static int dag_start_input_stream(libtrace_t *libtrace,
662                                  struct dag_per_stream_t * stream) {
663        struct timeval zero, nopoll;
664        uint8_t *top, *bottom, *starttop;
665        top = bottom = NULL;
666
667        zero.tv_sec = 0;
668        zero.tv_usec = 10000;
669        nopoll = zero;
670
671        /* Attach and start the DAG stream */
672        if (dag_attach_stream64(FORMAT_DATA->device->fd,
673                              stream->dagstream, 0, 0) < 0) {
674                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
675                              stream->dagstream);
676                return -1;
677        }
678
679        if (dag_start_stream(FORMAT_DATA->device->fd,
680                             stream->dagstream) < 0) {
681                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
682                              stream->dagstream);
683                return -1;
684        }
685        FORMAT_DATA->stream_attached = 1;
686
687        /* We don't want the dag card to do any sleeping */
688        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
689                            stream->dagstream, 0, &zero,
690                            &nopoll) < 0) {
691                trace_set_err(libtrace, errno,
692                              "dag_set_stream_poll failed!");
693                return -1;
694        }
695
696        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
697                                      stream->dagstream,
698                                      &bottom);
699
700        /* Should probably flush the memory hole now */
701        top = starttop;
702        while (starttop - bottom > 0) {
703                bottom += (starttop - bottom);
704                top = dag_advance_stream(FORMAT_DATA->device->fd,
705                                         stream->dagstream,
706                                         &bottom);
707        }
708        stream->top = top;
709        stream->bottom = bottom;
710        stream->processed = 0;
711        stream->drops = 0;
712
713        return 0;
714
715}
716
717/* Starts a DAG input trace */
718static int dag_start_input(libtrace_t *libtrace)
719{
720        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
721}
722
723static int dag_pstart_input(libtrace_t *libtrace)
724{
725        char *scan, *tok;
726        uint16_t stream_count = 0, max_streams;
727        int iserror = 0;
728        struct dag_per_stream_t stream_data;
729
730        /* Check we aren't trying to create more threads than the DAG card can
731         * handle */
732        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
733        if (libtrace->perpkt_thread_count > max_streams) {
734                fprintf(stderr,
735                              "WARNING: DAG has only %u streams available, "
736                              "capping total number of threads at this value.",
737                              max_streams);
738                libtrace->perpkt_thread_count = max_streams;
739        }
740
741        /* Get the stream names from the uri */
742        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
743                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
744                              "Format uri doesn't specify the DAG streams");
745                iserror = 1;
746                goto cleanup;
747        }
748
749        scan++;
750
751        tok = strtok(scan, ",");
752        while (tok != NULL) {
753                /* Ensure we haven't specified too many streams */
754                if (stream_count >= libtrace->perpkt_thread_count) {
755                        fprintf(stderr,
756                                      "WARNING: Format uri specifies too many "
757                                      "streams. Maximum is %u, so only using "
758                                      "the first %u from the uri.",
759                                      libtrace->perpkt_thread_count, 
760                                      libtrace->perpkt_thread_count);
761                        break;
762                }
763
764                /* Save the stream details */
765                if (stream_count == 0) {
766                        /* Special case where we update the existing stream
767                         * data structure */
768                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
769                } else {
770                        memset(&stream_data, 0, sizeof(stream_data));
771                        stream_data.dagstream = (uint16_t)atoi(tok);
772                        libtrace_list_push_back(FORMAT_DATA->per_stream,
773                                                &stream_data);
774                }
775
776                stream_count++;
777                tok = strtok(NULL, ",");
778        }
779
780        if (stream_count < libtrace->perpkt_thread_count) {
781                libtrace->perpkt_thread_count = stream_count;
782        }
783       
784        FORMAT_DATA->stream_attached = 1;
785
786 cleanup:
787        if (iserror) {
788                return -1;
789        } else {
790                return 0;
791        }
792}
793
794/* Pauses a DAG output trace */
795static int dag_pause_output(libtrace_out_t *libtrace)
796{
797        /* Stop and detach the stream */
798        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
799                            FORMAT_DATA_OUT->dagstream) < 0) {
800                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
801                return -1;
802        }
803        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
804                              FORMAT_DATA_OUT->dagstream) < 0) {
805                trace_set_err_out(libtrace, errno,
806                                  "Could not detach DAG stream");
807                return -1;
808        }
809        FORMAT_DATA_OUT->stream_attached = 0;
810        return 0;
811}
812
813/* Pauses a DAG input trace */
814static int dag_pause_input(libtrace_t *libtrace)
815{
816        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
817
818        /* Stop and detach each stream */
819        while (tmp != NULL) {
820                if (dag_stop_stream(FORMAT_DATA->device->fd,
821                                    STREAM_DATA(tmp)->dagstream) < 0) {
822                        trace_set_err(libtrace, errno,
823                                      "Could not stop DAG stream");
824                        return -1;
825                }
826                if (dag_detach_stream(FORMAT_DATA->device->fd,
827                                      STREAM_DATA(tmp)->dagstream) < 0) {
828                        trace_set_err(libtrace, errno,
829                                      "Could not detach DAG stream");
830                        return -1;
831                }
832
833                tmp = tmp->next;
834        }
835
836        FORMAT_DATA->stream_attached = 0;
837        return 0;
838}
839
840
841
842/* Closes a DAG input trace */
843static int dag_fin_input(libtrace_t *libtrace)
844{
845        /* Need the lock, since we're going to be handling the device list */
846        pthread_mutex_lock(&open_dag_mutex);
847
848        /* Detach the stream if we are not paused */
849        if (FORMAT_DATA->stream_attached)
850                dag_pause_input(libtrace);
851        FORMAT_DATA->device->ref_count--;
852
853        /* Close the DAG device if there are no more references to it */
854        if (FORMAT_DATA->device->ref_count == 0)
855                dag_close_device(FORMAT_DATA->device);
856
857        if (DUCK.dummy_duck)
858                trace_destroy_dead(DUCK.dummy_duck);
859
860        /* Clear the list */
861        libtrace_list_deinit(FORMAT_DATA->per_stream);
862        free(libtrace->format_data);
863        pthread_mutex_unlock(&open_dag_mutex);
864        return 0; /* success */
865}
866
867/* Closes a DAG output trace */
868static int dag_fin_output(libtrace_out_t *libtrace)
869{
870
871        /* Commit any outstanding traffic in the txbuffer */
872        if (FORMAT_DATA_OUT->waiting) {
873                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
874                                           FORMAT_DATA_OUT->dagstream,
875                                           FORMAT_DATA_OUT->waiting );
876        }
877
878        /* Wait until the buffer is nearly clear before exiting the program,
879         * as we will lose packets otherwise */
880        dag_tx_get_stream_space64
881                (FORMAT_DATA_OUT->device->fd,
882                 FORMAT_DATA_OUT->dagstream,
883                 dag_get_stream_buffer_size64(FORMAT_DATA_OUT->device->fd,
884                                            FORMAT_DATA_OUT->dagstream) - 8);
885
886        /* Need the lock, since we're going to be handling the device list */
887        pthread_mutex_lock(&open_dag_mutex);
888
889        /* Detach the stream if we are not paused */
890        if (FORMAT_DATA_OUT->stream_attached)
891                dag_pause_output(libtrace);
892        FORMAT_DATA_OUT->device->ref_count --;
893
894        /* Close the DAG device if there are no more references to it */
895        if (FORMAT_DATA_OUT->device->ref_count == 0)
896                dag_close_device(FORMAT_DATA_OUT->device);
897        free(libtrace->format_data);
898        pthread_mutex_unlock(&open_dag_mutex);
899        return 0; /* success */
900}
901
902#ifdef DAGIOC_CARD_DUCK
903#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
904#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
905#else
906#ifdef DAGIOCDUCK
907#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
908#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
909#else
910#warning "DAG appears to be missing DUCK support"
911#endif
912#endif
913
914/* Extracts DUCK information from the DAG card and produces a DUCK packet */
915static int dag_get_duckinfo(libtrace_t *libtrace,
916                                libtrace_packet_t *packet) {
917
918        if (DUCK.duck_freq == 0)
919                return 0;
920
921#ifndef LIBTRACE_DUCK_IOCTL
922        trace_set_err(libtrace, errno, 
923                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
924        DUCK.duck_freq = 0;
925        return -1;
926#endif
927
928        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
929                return 0;
930
931        /* Allocate memory for the DUCK data */
932        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
933            !packet->buffer) {
934                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
935                packet->buf_control = TRACE_CTRL_PACKET;
936                if (!packet->buffer) {
937                        trace_set_err(libtrace, errno,
938                                      "Cannot allocate packet buffer");
939                        return -1;
940                }
941        }
942
943        /* DUCK doesn't have a format header */
944        packet->header = 0;
945        packet->payload = packet->buffer;
946
947        /* No need to check if we can get DUCK or not - we're modern
948         * enough so just grab the DUCK info */
949        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
950                   (duckinf_t *)packet->payload) < 0)) {
951                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
952                DUCK.duck_freq = 0;
953                return -1;
954        }
955
956        packet->type = LIBTRACE_DUCK_VERSION;
957
958        /* Set the packet's trace to point at a DUCK trace, so that the
959         * DUCK format functions will be called on the packet rather than the
960         * DAG ones */
961        if (!DUCK.dummy_duck)
962                DUCK.dummy_duck = trace_create_dead("duck:dummy");
963        packet->trace = DUCK.dummy_duck;
964        DUCK.last_duck = DUCK.last_pkt;
965        packet->error = sizeof(duckinf_t);
966        return sizeof(duckinf_t);
967}
968
969/* Determines the amount of data available to read from the DAG card */
970static int dag_available(libtrace_t *libtrace,
971                         struct dag_per_stream_t *stream_data)
972{
973        uint32_t diff = stream_data->top - stream_data->bottom;
974
975        /* If we've processed more than 4MB of data since we last called
976         * dag_advance_stream, then we should call it again to allow the
977         * space occupied by that 4MB to be released */
978        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
979                return diff;
980
981        /* Update the top and bottom pointers */
982        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
983                                              stream_data->dagstream,
984                                              &(stream_data->bottom));
985
986        if (stream_data->top == NULL) {
987                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
988                return -1;
989        }
990        stream_data->processed = 0;
991        diff = stream_data->top - stream_data->bottom;
992        return diff;
993}
994
995/* Returns a pointer to the start of the next complete ERF record */
996static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
997{
998        dag_record_t *erfptr = NULL;
999        uint16_t size;
1000
1001        erfptr = (dag_record_t *)stream_data->bottom;
1002        if (!erfptr)
1003                return NULL;
1004
1005        size = ntohs(erfptr->rlen);
1006        if (size < dag_record_size) {
1007                fprintf(stderr, "DAG2.5 rlen is invalid (rlen %u, must be at least %u\n",
1008                        size, dag_record_size);
1009                return NULL;
1010        }
1011
1012        /* Make certain we have the full packet available */
1013        if (size > (stream_data->top - stream_data->bottom))
1014                return NULL;
1015
1016        stream_data->bottom += size;
1017        stream_data->processed += size;
1018        return erfptr;
1019}
1020
1021/* Converts a buffer containing a recently read DAG packet record into a
1022 * libtrace packet */
1023static int dag_prepare_packet_stream(libtrace_t *libtrace,
1024                                     struct dag_per_stream_t *stream_data,
1025                                     libtrace_packet_t *packet,
1026                                     void *buffer, libtrace_rt_types_t rt_type,
1027                                     uint32_t flags)
1028{
1029        dag_record_t *erfptr;
1030
1031        /* If the packet previously owned a buffer that is not the buffer
1032         * that contains the new packet data, we're going to need to free the
1033         * old one to avoid memory leaks */
1034        if (packet->buffer != buffer &&
1035            packet->buf_control == TRACE_CTRL_PACKET) {
1036                free(packet->buffer);
1037        }
1038
1039        /* Set the buffer owner appropriately */
1040        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1041                packet->buf_control = TRACE_CTRL_PACKET;
1042        } else
1043                packet->buf_control = TRACE_CTRL_EXTERNAL;
1044
1045        /* Update the packet pointers and type appropriately */
1046        erfptr = (dag_record_t *)buffer;
1047        packet->buffer = erfptr;
1048        packet->header = erfptr;
1049        packet->type = rt_type;
1050
1051        if (erfptr->flags.rxerror == 1) {
1052                /* rxerror means the payload is corrupt - drop the payload
1053                 * by tweaking rlen */
1054                packet->payload = NULL;
1055                erfptr->rlen = htons(erf_get_framing_length(packet));
1056        } else {
1057                packet->payload = (char*)packet->buffer
1058                        + erf_get_framing_length(packet);
1059        }
1060
1061        if (libtrace->format_data == NULL) {
1062                dag_init_format_data(libtrace);
1063        }
1064
1065        /* Update the dropped packets counter */
1066        /* No loss counter for DSM coloured records - have to use some
1067         * other API */
1068        if (erf_is_color_type(erfptr->type)) {
1069                /* TODO */
1070        } else {
1071                /* Use the ERF loss counter */
1072                if (stream_data->seeninterface[erfptr->flags.iface]
1073                    == 0) {
1074                        stream_data->seeninterface[erfptr->flags.iface]
1075                                = 1;
1076                } else {
1077                        stream_data->drops += ntohs(erfptr->lctr);
1078                }
1079        }
1080
1081        return 0;
1082}
1083
1084static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1085                              void *buffer, libtrace_rt_types_t rt_type,
1086                              uint32_t flags)
1087{
1088        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1089                                       buffer, rt_type, flags);
1090}
1091
1092/*
1093 * dag_write_packet() at this stage attempts to improve tx performance
1094 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1095 * has been met. I observed approximately 270% performance increase
1096 * through this relatively naive tweak. No optimisation of buffer sizes
1097 * was attempted.
1098 */
1099
1100/* Pushes an ERF record onto the transmit stream */
1101static int dag_dump_packet(libtrace_out_t *libtrace,
1102                           dag_record_t *erfptr, unsigned int pad,
1103                           void *buffer)
1104{
1105        int size;
1106
1107        /*
1108         * If we've got 0 bytes waiting in the txqueue, assume that we
1109         * haven't requested any space yet, and request some, storing
1110         * the pointer at FORMAT_DATA_OUT->txbuffer.
1111         *
1112         * The amount to request is slightly magical at the moment - it's
1113         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1114         * the buffer and handle overruns.
1115         */
1116        if (FORMAT_DATA_OUT->waiting == 0) {
1117                FORMAT_DATA_OUT->txbuffer =
1118                        dag_tx_get_stream_space64(FORMAT_DATA_OUT->device->fd,
1119                                                FORMAT_DATA_OUT->dagstream,
1120                                                16908288);
1121        }
1122
1123        /*
1124         * Copy the header separately to the body, as we can't guarantee they
1125         * are in contiguous memory
1126         */
1127        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1128               (dag_record_size + pad));
1129        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1130
1131        /*
1132         * Copy our incoming packet into the outgoing buffer, and increment
1133         * our waiting count
1134         */
1135        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1136        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1137               size);
1138        FORMAT_DATA_OUT->waiting += size;
1139
1140        /*
1141         * If our output buffer has more than 16 Mebibytes in it, commit those
1142         * bytes and reset the waiting count to 0.
1143         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1144         * case there is still data in the buffer at program exit.
1145         */
1146        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1147                FORMAT_DATA_OUT->txbuffer =
1148                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1149                                                   FORMAT_DATA_OUT->dagstream,
1150                                                   FORMAT_DATA_OUT->waiting);
1151                FORMAT_DATA_OUT->waiting = 0;
1152        }
1153
1154        return size + pad + dag_record_size;
1155}
1156
1157/* Attempts to determine a suitable ERF type for a given packet. Returns true
1158 * if one is found, false otherwise */
1159static bool find_compatible_linktype(libtrace_out_t *libtrace,
1160                                     libtrace_packet_t *packet, char *type)
1161{
1162        /* Keep trying to simplify the packet until we can find
1163         * something we can do with it */
1164
1165        do {
1166                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1167
1168                /* Success */
1169                if (*type != (char)-1)
1170                        return true;
1171
1172                if (!demote_packet(packet)) {
1173                        trace_set_err_out(libtrace,
1174                                          TRACE_ERR_NO_CONVERSION,
1175                                          "No erf type for packet (%i)",
1176                                          trace_get_link_type(packet));
1177                        return false;
1178                }
1179
1180        } while(1);
1181
1182        return true;
1183}
1184
1185/* Writes a packet to the provided DAG output trace */
1186static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1187{
1188        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1189         * coding sucks, sorry about that.
1190         */
1191        unsigned int pad = 0;
1192        int numbytes;
1193        void *payload = packet->payload;
1194        dag_record_t *header = (dag_record_t *)packet->header;
1195        char erf_type = 0;
1196
1197        if(!packet->header) {
1198                /* No header, probably an RT packet. Lifted from
1199                 * erf_write_packet(). */
1200                return -1;
1201        }
1202
1203        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1204                return 0;
1205
1206        pad = dag_get_padding(packet);
1207
1208        /*
1209         * If the payload is null, adjust the rlen. Discussion of this is
1210         * attached to erf_write_packet()
1211         */
1212        if (payload == NULL) {
1213                header->rlen = htons(dag_record_size + pad);
1214        }
1215
1216        if (packet->type == TRACE_RT_DATA_ERF) {
1217                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1218        } else {
1219                /* Build up a new packet header from the existing header */
1220
1221                /* Simplify the packet first - if we can't do this, break
1222                 * early */
1223                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1224                        return -1;
1225
1226                dag_record_t erfhdr;
1227
1228                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1229                payload=packet->payload;
1230                pad = dag_get_padding(packet);
1231
1232                /* Flags. Can't do this */
1233                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1234                if (trace_get_direction(packet)!=(int)~0U)
1235                        erfhdr.flags.iface = trace_get_direction(packet);
1236
1237                erfhdr.type = erf_type;
1238
1239                /* Packet length (rlen includes format overhead) */
1240                if (trace_get_capture_length(packet) <= 0
1241                       || trace_get_capture_length(packet) > 65536) {
1242                        trace_set_err_out(libtrace, TRACE_ERR_BAD_PACKET,
1243                                "Capture length is out of range in dag_write_packet()");
1244                        return -1;
1245                }
1246                if (erf_get_framing_length(packet) <= 0
1247                       || trace_get_framing_length(packet) > 65536) {
1248                        trace_set_err_out(libtrace, TRACE_ERR_BAD_PACKET,
1249                                "Framing length is out of range in dag_write_packet()");
1250                        return -1;
1251                }
1252                if (trace_get_capture_length(packet) +
1253                       erf_get_framing_length(packet) <= 0
1254                       || trace_get_capture_length(packet) +
1255                       erf_get_framing_length(packet) > 65536) {
1256                        trace_set_err_out(libtrace, TRACE_ERR_BAD_PACKET,
1257                                "Capture + framing length is out of range in dag_write_packet()");
1258                        return -1;
1259                }
1260
1261                erfhdr.rlen = htons(trace_get_capture_length(packet)
1262                                    + erf_get_framing_length(packet));
1263
1264
1265                /* Loss counter. Can't do this */
1266                erfhdr.lctr = 0;
1267                /* Wire length, does not include padding! */
1268                erfhdr.wlen = htons(trace_get_wire_length(packet));
1269
1270                /* Write it out */
1271                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1272        }
1273
1274        return numbytes;
1275}
1276
1277/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1278 *
1279 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1280 */
1281static int dag_read_packet_stream(libtrace_t *libtrace,
1282                                struct dag_per_stream_t *stream_data,
1283                                libtrace_thread_t *t, /* Optional */
1284                                libtrace_packet_t *packet)
1285{
1286        int size = 0;
1287        dag_record_t *erfptr = NULL;
1288        struct timeval tv;
1289        int numbytes = 0;
1290        uint32_t flags = 0;
1291        struct timeval maxwait, pollwait;
1292
1293        pollwait.tv_sec = 0;
1294        pollwait.tv_usec = 10000;
1295        maxwait.tv_sec = 0;
1296        maxwait.tv_usec = 250000;
1297
1298        /* Check if we're due for a DUCK report - only report on the first thread */
1299        if (stream_data == FORMAT_DATA_FIRST) {
1300                size = dag_get_duckinfo(libtrace, packet);
1301                if (size != 0)
1302                        return size;
1303        }
1304
1305
1306        /* Don't let anyone try to free our DAG memory hole! */
1307        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1308
1309        /* If the packet buffer is currently owned by libtrace, free it so
1310         * that we can set the packet to point into the DAG memory hole */
1311        if (packet->buf_control == TRACE_CTRL_PACKET) {
1312                free(packet->buffer);
1313                packet->buffer = 0;
1314        }
1315
1316        if (dag_set_stream_poll64(FORMAT_DATA->device->fd, stream_data->dagstream,
1317                                sizeof(dag_record_t), &maxwait,
1318                                &pollwait) == -1) {
1319                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1320                return -1;
1321        }
1322
1323        /* Grab a full ERF record */
1324        do {
1325                numbytes = dag_available(libtrace, stream_data);
1326                if (numbytes < 0)
1327                        return numbytes;
1328                if (numbytes < dag_record_size) {
1329                        /* Check the message queue if we have one to check */
1330                        if (t != NULL &&
1331                            libtrace_message_queue_count(&t->messages) > 0)
1332                                return -2;
1333
1334                        if ((numbytes=is_halted(libtrace)) != -1)
1335                                return numbytes;
1336                        /* Block until we see a packet */
1337                        continue;
1338                }
1339                erfptr = dag_get_record(stream_data);
1340        } while (erfptr == NULL);
1341
1342        packet->trace = libtrace;
1343
1344        /* Prepare the libtrace packet */
1345        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1346                                    TRACE_RT_DATA_ERF, flags))
1347                return -1;
1348
1349        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1350        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1351                tv = trace_get_timeval(packet);
1352                DUCK.last_pkt = tv.tv_sec;
1353        }
1354
1355        packet->order = erf_get_erf_timestamp(packet);
1356        packet->error = packet->payload ? htons(erfptr->rlen) :
1357                                          erf_get_framing_length(packet);
1358
1359        return packet->error;
1360}
1361
1362static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1363{
1364        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1365}
1366
1367static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1368                             libtrace_packet_t **packets, size_t nb_packets)
1369{
1370        int ret;
1371        size_t read_packets = 0;
1372        int numbytes = 0;
1373
1374        struct dag_per_stream_t *stream_data =
1375                (struct dag_per_stream_t *)t->format_data;
1376
1377        /* Read as many packets as we can, but read atleast one packet */
1378        do {
1379                ret = dag_read_packet_stream(libtrace, stream_data, t,
1380                                           packets[read_packets]);
1381                if (ret < 0)
1382                        return ret;
1383
1384                read_packets++;
1385
1386                /* Make sure we don't read too many packets..! */
1387                if (read_packets >= nb_packets)
1388                        break;
1389
1390                numbytes = dag_available(libtrace, stream_data);
1391        } while (numbytes >= dag_record_size);
1392
1393        return read_packets;
1394}
1395
1396/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1397 * packet is available, we will return a packet event. Otherwise we will
1398 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1399 */
1400static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1401                                           libtrace_packet_t *packet)
1402{
1403        libtrace_eventobj_t event = {0,0,0.0,0};
1404        dag_record_t *erfptr = NULL;
1405        int numbytes;
1406        uint32_t flags = 0;
1407        struct timeval minwait, tv;
1408       
1409        minwait.tv_sec = 0;
1410        minwait.tv_usec = 10000;
1411
1412        /* Check if we're meant to provide a DUCK update */
1413        numbytes = dag_get_duckinfo(libtrace, packet);
1414        if (numbytes < 0) {
1415                event.type = TRACE_EVENT_TERMINATE;
1416                return event;
1417        } else if (numbytes > 0) {
1418                event.type = TRACE_EVENT_PACKET;
1419                return event;
1420        }
1421       
1422        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
1423                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1424                                &minwait) == -1) {
1425                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1426                event.type = TRACE_EVENT_TERMINATE;
1427                return event;
1428        }
1429
1430        do {
1431                erfptr = NULL;
1432                numbytes = 0;
1433
1434                /* Need to call dag_available so that the top pointer will get
1435                 * updated, otherwise we'll never see any data! */
1436                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1437
1438                /* May as well not bother calling dag_get_record if
1439                 * dag_available suggests that there's no data */
1440                if (numbytes != 0)
1441                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1442                if (erfptr == NULL) {
1443                        /* No packet available - sleep for a very short time */
1444                        if (libtrace_halt) {
1445                                event.type = TRACE_EVENT_TERMINATE;
1446                        } else {
1447                                event.type = TRACE_EVENT_SLEEP;
1448                                event.seconds = 0.0001;
1449                        }
1450                        break;
1451                }
1452                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1453                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1454                        event.type = TRACE_EVENT_TERMINATE;
1455                        break;
1456                }
1457
1458
1459                event.size = trace_get_capture_length(packet) +
1460                        trace_get_framing_length(packet);
1461
1462                /* XXX trace_read_packet() normally applies the following
1463                 * config options for us, but this function is called via
1464                 * trace_event() so we have to do it ourselves */
1465
1466                if (libtrace->filter) {
1467                        int filtret = trace_apply_filter(libtrace->filter,
1468                                                         packet);
1469                        if (filtret == -1) {
1470                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1471                                              "Bad BPF Filter");
1472                                event.type = TRACE_EVENT_TERMINATE;
1473                                break;
1474                        }
1475
1476                        if (filtret == 0) {
1477                                /* This packet isn't useful so we want to
1478                                 * immediately see if there is another suitable
1479                                 * one - we definitely DO NOT want to return
1480                                 * a sleep event in this case, like we used to
1481                                 * do! */
1482                                libtrace->filtered_packets ++;
1483                                trace_clear_cache(packet);
1484                                continue;
1485                        }
1486
1487                        event.type = TRACE_EVENT_PACKET;
1488                } else {
1489                        event.type = TRACE_EVENT_PACKET;
1490                }
1491
1492                /* Update the DUCK timer */
1493                tv = trace_get_timeval(packet);
1494                DUCK.last_pkt = tv.tv_sec;
1495               
1496                if (libtrace->snaplen > 0) {
1497                        trace_set_capture_length(packet, libtrace->snaplen);
1498                }
1499                libtrace->accepted_packets ++;
1500                break;
1501        } while(1);
1502
1503        return event;
1504}
1505
1506static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1507{
1508        libtrace_list_node_t *tmp;
1509        if (!libtrace) {
1510                fprintf(stderr, "NULL trace passed into dag_get_statistics()\n");
1511                return;
1512        }
1513        if (!stat) {
1514                trace_set_err(libtrace, TRACE_ERR_STAT, "NULL stat passed into dag_get_statistics()");
1515                return;
1516        }
1517        tmp = FORMAT_DATA_HEAD;
1518
1519        /* Dropped packets */
1520        stat->dropped_valid = 1;
1521        stat->dropped = 0;
1522        while (tmp != NULL) {
1523                stat->dropped += STREAM_DATA(tmp)->drops;
1524                tmp = tmp->next;
1525        }
1526
1527}
1528
1529static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1530                                       libtrace_stat_t *stat) {
1531        struct dag_per_stream_t *stream_data = t->format_data;
1532        if (!libtrace) {
1533                fprintf(stderr, "NULL trace passed into dag_get_thread_statistics()\n");
1534                return;
1535        }
1536        if (!stat) {
1537                trace_set_err(libtrace, TRACE_ERR_STAT, "NULL stat passed into dag_get_thread_statistics()");
1538                return;
1539        }
1540        stat->dropped_valid = 1;
1541        stat->dropped = stream_data->drops;
1542
1543        stat->filtered_valid = 1;
1544        stat->filtered = 0;
1545}
1546
1547/* Prints some semi-useful help text about the DAG format module */
1548static void dag_help(void) {
1549        printf("dag format module: $Revision: 1755 $\n");
1550        printf("Supported input URIs:\n");
1551        printf("\tdag:/dev/dagn\n");
1552        printf("\n");
1553        printf("\te.g.: dag:/dev/dag0\n");
1554        printf("\n");
1555        printf("Supported output URIs:\n");
1556        printf("\tnone\n");
1557        printf("\n");
1558}
1559
1560static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1561                                bool reader)
1562{
1563        struct dag_per_stream_t *stream_data;
1564        libtrace_list_node_t *node;
1565
1566        if (reader && t->type == THREAD_PERPKT) {
1567                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1568                                                t->perpkt_num);
1569                if (node == NULL) {
1570                        return -1;
1571                }
1572                stream_data = node->data;
1573
1574                /* Pass the per thread data to the thread */
1575                t->format_data = stream_data;
1576
1577                /* Attach and start the DAG stream */
1578                if (dag_start_input_stream(libtrace, stream_data) < 0)
1579                        return -1;
1580        }
1581
1582        return 0;
1583}
1584
1585static struct libtrace_format_t dag = {
1586        "dag",
1587        "$Id$",
1588        TRACE_FORMAT_ERF,
1589        dag_probe_filename,             /* probe filename */
1590        NULL,                           /* probe magic */
1591        dag_init_input,                 /* init_input */
1592        dag_config_input,               /* config_input */
1593        dag_start_input,                /* start_input */
1594        dag_pause_input,                /* pause_input */
1595        dag_init_output,                /* init_output */
1596        NULL,                           /* config_output */
1597        dag_start_output,               /* start_output */
1598        dag_fin_input,                  /* fin_input */
1599        dag_fin_output,                 /* fin_output */
1600        dag_read_packet,                /* read_packet */
1601        dag_prepare_packet,             /* prepare_packet */
1602        NULL,                           /* fin_packet */
1603        dag_write_packet,               /* write_packet */
1604        NULL,                           /* flush_output */
1605        erf_get_link_type,              /* get_link_type */
1606        erf_get_direction,              /* get_direction */
1607        erf_set_direction,              /* set_direction */
1608        erf_get_erf_timestamp,          /* get_erf_timestamp */
1609        NULL,                           /* get_timeval */
1610        NULL,                           /* get_seconds */
1611        NULL,                           /* get_timespec */
1612        NULL,                           /* seek_erf */
1613        NULL,                           /* seek_timeval */
1614        NULL,                           /* seek_seconds */
1615        erf_get_capture_length,         /* get_capture_length */
1616        erf_get_wire_length,            /* get_wire_length */
1617        erf_get_framing_length,         /* get_framing_length */
1618        erf_set_capture_length,         /* set_capture_length */
1619        NULL,                           /* get_received_packets */
1620        NULL,                           /* get_filtered_packets */
1621        NULL,                           /* get_dropped_packets */
1622        dag_get_statistics,             /* get_statistics */
1623        NULL,                           /* get_fd */
1624        trace_event_dag,                /* trace_event */
1625        dag_help,                       /* help */
1626        NULL,                            /* next pointer */
1627        {true, 0}, /* live packet capture, thread limit TBD */
1628        dag_pstart_input,
1629        dag_pread_packets,
1630        dag_pause_input,
1631        NULL,
1632        dag_pregister_thread,
1633        NULL,
1634        dag_get_thread_statistics       /* get thread stats */
1635};
1636
1637void dag_constructor(void)
1638{
1639        register_format(&dag);
1640}
Note: See TracBrowser for help on using the repository browser.