source: lib/format_dag25.c @ f6f3ae5

develop
Last change on this file since f6f3ae5 was f6f3ae5, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Assertion cleanup

  • Property mode set to 100644
File size: 47.2 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#define _GNU_SOURCE
27
28#include "config.h"
29#include "common.h"
30#include "libtrace.h"
31#include "libtrace_int.h"
32#include "format_helper.h"
33#include "format_erf.h"
34
35#include <assert.h>
36#include <errno.h>
37#include <fcntl.h>
38#include <stdio.h>
39#include <string.h>
40#include <stdlib.h>
41#include <sys/stat.h>
42
43#include <sys/mman.h>
44/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
45 * Windows anyway so we'll worry about this more later :] */
46#include <pthread.h>
47
48
49#ifdef HAVE_DAG_CONFIG_API_H
50#include <dag_config_api.h>
51#endif
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66/* This format deals with DAG cards that are using drivers from the 2.5 version
67 * onwards, including 3.X.
68 *
69 * DAG is a LIVE capture format.
70 *
71 * This format does support writing, provided the DAG card that you are using
72 * has transmit (Tx) support. Additionally, packets read using this format
73 * are in the ERF format, so can easily be written as ERF traces without
74 * losing any data.
75 */
76
77#define DATA(x) ((struct dag_format_data_t *)x->format_data)
78#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
79#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
80
81#define FORMAT_DATA DATA(libtrace)
82#define FORMAT_DATA_OUT DATA_OUT(libtrace)
83
84#define DUCK FORMAT_DATA->duck
85
86#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
87#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
88
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* The DAG device being used for writing */
107        struct dag_dev_t *device;
108        /* The DAG stream that is being written on */
109        unsigned int dagstream;
110        /* Boolean flag indicating whether the stream is currently attached */
111        int stream_attached;
112        /* The amount of data waiting to be transmitted, in bytes */
113        uint64_t waiting;
114        /* A buffer to hold the data to be transmittted */
115        uint8_t *txbuffer;
116};
117
118/* Data that is stored against each input stream */
119struct dag_per_stream_t {
120        /* DAG stream number */
121        uint16_t dagstream;
122        /* Pointer to the last unread byte in the DAG memory */
123        uint8_t *top;
124        /* Pointer to the first unread byte in the DAG memory */
125        uint8_t *bottom;
126        /* Amount of data processed from the bottom pointer */
127        uint32_t processed;
128        /* Number of packets seen by the stream */
129        uint64_t pkt_count;
130        /* Drop count for this particular stream */
131        uint64_t drops;
132        /* Boolean values to indicate if a particular interface has been seen
133         * or not. This is limited to four interfaces, which is enough to
134         * support all current DAG cards */
135        uint8_t seeninterface[4];
136};
137
138/* "Global" data that is stored for each DAG input trace */
139struct dag_format_data_t {
140        /* DAG device */
141        struct dag_dev_t *device;
142        /* Boolean flag indicating whether the trace is currently attached */
143        int stream_attached;
144        /* Data stored against each DAG input stream */
145        libtrace_list_t *per_stream;
146
147        /* Data required for regular DUCK reporting.
148         * We put this on a new cache line otherwise we have a lot of false
149         * sharing caused by updating the last_pkt.
150         * This should only ever be accessed by the first thread stream,
151         * that includes both read and write operations.
152         */
153        struct {
154                /* Timestamp of the last DUCK report */
155                uint32_t last_duck;
156                /* The number of seconds between each DUCK report */
157                uint32_t duck_freq;
158                /* Timestamp of the last packet read from the DAG card */
159                uint32_t last_pkt;
160                /* Dummy trace to ensure DUCK packets are dealt with using the
161                 * DUCK format functions */
162                libtrace_t *dummy_duck;
163        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
164};
165
166/* To be thread-safe, we're going to need a mutex for operating on the list
167 * of DAG devices */
168pthread_mutex_t open_dag_mutex;
169
170/* The list of DAG devices that have been opened by libtrace.
171 *
172 * We can only open each DAG device once, but we might want to read from
173 * multiple streams. Therefore, we need to maintain a list of devices that we
174 * have opened (with ref counts!) so that we don't try to open a device too
175 * many times or close a device that we're still using */
176struct dag_dev_t *open_dags = NULL;
177
178/* Returns the amount of padding between the ERF header and the start of the
179 * captured packet data */
180static int dag_get_padding(const libtrace_packet_t *packet)
181{
182        /* ERF Ethernet records have a 2 byte padding before the packet itself
183         * so that the IP header is aligned on a 32 bit boundary.
184         */
185        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
186                dag_record_t *erfptr = (dag_record_t *)packet->header;
187                switch(erfptr->type) {
188                        case TYPE_ETH:
189                        case TYPE_COLOR_ETH:
190                        case TYPE_DSM_COLOR_ETH:
191                        case TYPE_COLOR_HASH_ETH:
192                                return 2;
193                        default:                return 0;
194                }
195        }
196        else {
197                switch(trace_get_link_type(packet)) {
198                        case TRACE_TYPE_ETH:    return 2;
199                        default:                return 0;
200                }
201        }
202}
203
204/* Attempts to determine if the given filename refers to a DAG device */
205static int dag_probe_filename(const char *filename)
206{
207        struct stat statbuf;
208        /* Can we stat the file? */
209        if (stat(filename, &statbuf) != 0) {
210                return 0;
211        }
212        /* Is it a character device? */
213        if (!S_ISCHR(statbuf.st_mode)) {
214                return 0;
215        }
216        /* Yeah, it's probably us. */
217        return 1;
218}
219
220/* Initialises the DAG output data structure */
221static void dag_init_format_out_data(libtrace_out_t *libtrace)
222{
223        libtrace->format_data = (struct dag_format_data_out_t *)
224                malloc(sizeof(struct dag_format_data_out_t));
225        // no DUCK on output
226        FORMAT_DATA_OUT->stream_attached = 0;
227        FORMAT_DATA_OUT->device = NULL;
228        FORMAT_DATA_OUT->dagstream = 0;
229        FORMAT_DATA_OUT->waiting = 0;
230
231}
232
233/* Initialises the DAG input data structure */
234static void dag_init_format_data(libtrace_t *libtrace)
235{
236        struct dag_per_stream_t stream_data;
237
238        libtrace->format_data = (struct dag_format_data_t *)
239                malloc(sizeof(struct dag_format_data_t));
240        DUCK.last_duck = 0;
241        DUCK.duck_freq = 0;
242        DUCK.last_pkt = 0;
243        DUCK.dummy_duck = NULL;
244
245        FORMAT_DATA->per_stream =
246                libtrace_list_init(sizeof(stream_data));
247        /*assert(FORMAT_DATA->per_stream != NULL);*/
248        if (FORMAT_DATA->per_stream == NULL) {
249                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
250                        "Unable to create list for stream in dag_init_format_data()");
251                return -1;
252        }
253
254        /* We'll start with just one instance of stream_data, and we'll
255         * add more later if we need them */
256        memset(&stream_data, 0, sizeof(stream_data));
257        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
258}
259
260/* Determines if there is already an entry for the given DAG device in the
261 * device list and increments the reference count for that device, if found.
262 *
263 * NOTE: This function assumes the open_dag_mutex is held by the caller */
264static struct dag_dev_t *dag_find_open_device(char *dev_name)
265{
266        struct dag_dev_t *dag_dev;
267
268        dag_dev = open_dags;
269
270        /* XXX: Not exactly zippy, but how often are we going to be dealing
271         * with multiple dag cards? */
272        while (dag_dev != NULL) {
273                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
274                        dag_dev->ref_count ++;
275                        return dag_dev;
276                }
277                dag_dev = dag_dev->next;
278        }
279        return NULL;
280}
281
282/* Closes a DAG device and removes it from the device list.
283 *
284 * Attempting to close a DAG device that has a non-zero reference count will
285 * cause an assertion failure!
286 *
287 * NOTE: This function assumes the open_dag_mutex is held by the caller */
288static void dag_close_device(struct dag_dev_t *dev)
289{
290        /* Need to remove from the device list */
291        /*assert(dev->ref_count == 0);*/
292        if (dev->ref_count != 0) {
293                fprintf(stderr, "Cannot close DAG device with non zero reference in dag_close_device()\n");
294                return;
295        }
296
297        if (dev->prev == NULL) {
298                open_dags = dev->next;
299                if (dev->next)
300                        dev->next->prev = NULL;
301        } else {
302                dev->prev->next = dev->next;
303                if (dev->next)
304                        dev->next->prev = dev->prev;
305        }
306
307        dag_close(dev->fd);
308        free(dev);
309}
310
311
312/* Opens a new DAG device for writing and adds it to the DAG device list
313 *
314 * NOTE: this function should only be called when opening a DAG device for
315 * writing - there is little practical difference between this and the
316 * function below that covers the reading case, but we need the output trace
317 * object to report errors properly so the two functions take slightly
318 * different arguments. This is really lame and there should be a much better
319 * way of doing this.
320 *
321 * NOTE: This function assumes the open_dag_mutex is held by the caller
322 */
323static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
324                                                char *dev_name)
325{
326        struct stat buf;
327        int fd;
328        struct dag_dev_t *new_dev;
329
330        /* Make sure the device exists */
331        if (stat(dev_name, &buf) == -1) {
332                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
333                return NULL;
334        }
335
336        /* Make sure it is the appropriate type of device */
337        if (S_ISCHR(buf.st_mode)) {
338                /* Try opening the DAG device */
339                if((fd = dag_open(dev_name)) < 0) {
340                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
341                                        dev_name);
342                        return NULL;
343                }
344        } else {
345                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
346                                dev_name);
347                return NULL;
348        }
349
350        /* Add the device to our device list - it is just a doubly linked
351         * list with no inherent ordering; just tack the new one on the front
352         */
353        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
354        new_dev->fd = fd;
355        new_dev->dev_name = dev_name;
356        new_dev->ref_count = 1;
357
358        new_dev->prev = NULL;
359        new_dev->next = open_dags;
360        if (open_dags)
361                open_dags->prev = new_dev;
362
363        open_dags = new_dev;
364
365        return new_dev;
366}
367
368/* Opens a new DAG device for reading and adds it to the DAG device list
369 *
370 * NOTE: this function should only be called when opening a DAG device for
371 * reading - there is little practical difference between this and the
372 * function above that covers the writing case, but we need the input trace
373 * object to report errors properly so the two functions take slightly
374 * different arguments. This is really lame and there should be a much better
375 * way of doing this.
376 *
377 * NOTE: This function assumes the open_dag_mutex is held by the caller */
378static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
379        struct stat buf;
380        int fd;
381        struct dag_dev_t *new_dev;
382
383        /* Make sure the device exists */
384        if (stat(dev_name, &buf) == -1) {
385                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
386                return NULL;
387        }
388
389        /* Make sure it is the appropriate type of device */
390        if (S_ISCHR(buf.st_mode)) {
391                /* Try opening the DAG device */
392                if((fd = dag_open(dev_name)) < 0) {
393                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
394                                      dev_name);
395                        return NULL;
396                }
397        } else {
398                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
399                              dev_name);
400                return NULL;
401        }
402
403        /* Add the device to our device list - it is just a doubly linked
404         * list with no inherent ordering; just tack the new one on the front
405         */
406        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
407        new_dev->fd = fd;
408        new_dev->dev_name = dev_name;
409        new_dev->ref_count = 1;
410
411        new_dev->prev = NULL;
412        new_dev->next = open_dags;
413        if (open_dags)
414                open_dags->prev = new_dev;
415
416        open_dags = new_dev;
417
418        return new_dev;
419}
420
421/* Creates and initialises a DAG output trace */
422static int dag_init_output(libtrace_out_t *libtrace)
423{
424        /* Upon successful creation, the device name is stored against the
425         * device and free when it is free()d */
426        char *dag_dev_name = NULL;
427        char *scan = NULL;
428        struct dag_dev_t *dag_device = NULL;
429        int stream = 1;
430
431        /* XXX I don't know if this is important or not, but this function
432         * isn't present in all of the driver releases that this code is
433         * supposed to support! */
434        /*
435        unsigned long wake_time;
436        dagutil_sleep_get_wake_time(&wake_time,0);
437        */
438
439        dag_init_format_out_data(libtrace);
440        /* Grab the mutex while we're likely to be messing with the device
441         * list */
442        pthread_mutex_lock(&open_dag_mutex);
443
444        /* Specific streams are signified using a comma in the libtrace URI,
445         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
446         *
447         * If no stream is specified, we will write using stream 1 */
448        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
449                dag_dev_name = strdup(libtrace->uridata);
450        } else {
451                dag_dev_name = (char *)strndup(libtrace->uridata,
452                                (size_t)(scan - libtrace->uridata));
453                stream = atoi(++scan);
454        }
455        FORMAT_DATA_OUT->dagstream = stream;
456
457        /* See if our DAG device is already open */
458        dag_device = dag_find_open_device(dag_dev_name);
459
460        if (dag_device == NULL) {
461                /* Device not yet opened - open it ourselves */
462                dag_device = dag_open_output_device(libtrace, dag_dev_name);
463        } else {
464                /* Otherwise, just use the existing one */
465                free(dag_dev_name);
466                dag_dev_name = NULL;
467        }
468
469        /* Make sure we have successfully opened a DAG device */
470        if (dag_device == NULL) {
471                if (dag_dev_name) {
472                        free(dag_dev_name);
473                }
474                pthread_mutex_unlock(&open_dag_mutex);
475                return -1;
476        }
477
478        FORMAT_DATA_OUT->device = dag_device;
479        pthread_mutex_unlock(&open_dag_mutex);
480        return 0;
481}
482
483/* Creates and initialises a DAG input trace */
484static int dag_init_input(libtrace_t *libtrace) {
485        /* Upon successful creation, the device name is stored against the
486         * device and free when it is free()d */
487        char *dag_dev_name = NULL;
488        char *scan = NULL;
489        int stream = 0;
490        struct dag_dev_t *dag_device = NULL;
491
492        dag_init_format_data(libtrace);
493        /* Grab the mutex while we're likely to be messing with the device
494         * list */
495        pthread_mutex_lock(&open_dag_mutex);
496
497
498        /* DAG cards support multiple streams. In a single threaded capture,
499         * these are specified using a comma in the libtrace URI,
500         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
501         *
502         * If no stream is specified, we will read from stream 0 with
503         * one thread
504         */
505        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
506                dag_dev_name = strdup(libtrace->uridata);
507        } else {
508                dag_dev_name = (char *)strndup(libtrace->uridata,
509                                (size_t)(scan - libtrace->uridata));
510                stream = atoi(++scan);
511        }
512
513        FORMAT_DATA_FIRST->dagstream = stream;
514
515        /* See if our DAG device is already open */
516        dag_device = dag_find_open_device(dag_dev_name);
517
518        if (dag_device == NULL) {
519                /* Device not yet opened - open it ourselves */
520                dag_device = dag_open_device(libtrace, dag_dev_name);
521        } else {
522                /* Otherwise, just use the existing one */
523                free(dag_dev_name);
524                dag_dev_name = NULL;
525        }
526
527        /* Make sure we have successfully opened a DAG device */
528        if (dag_device == NULL) {
529                if (dag_dev_name)
530                        free(dag_dev_name);
531                dag_dev_name = NULL;
532                pthread_mutex_unlock(&open_dag_mutex);
533                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to open DAG device %s", dag_dev_name);
534                return -1;
535        }
536
537        FORMAT_DATA->device = dag_device;
538
539        /* See Config_Status_API_Programming_Guide.pdf from the Endace
540           Dag Documentation */
541        /* Check kBooleanAttributeActive is true -- no point capturing
542         * on an interface that's disabled
543         *
544         * The symptom of the port being disabled is that libtrace
545         * will appear to hang. */
546        /* Check kBooleanAttributeFault is false */
547        /* Check kBooleanAttributeLocalFault is false */
548        /* Check kBooleanAttributeLock is true ? */
549        /* Check kBooleanAttributePeerLink ? */
550
551        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
552           on libtrace promisc attribute?*/
553        /* Set kUint32AttributeSnapLength to the snaplength */
554
555        pthread_mutex_unlock(&open_dag_mutex);
556        return 0;
557}
558
559#ifdef HAVE_DAG_CONFIG_API_H
560static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
561        dag_card_ref_t card_ref = NULL;
562        dag_component_t root = NULL;
563        attr_uuid_t uuid = 0;
564
565        if (slen < 0)
566                slen = 0; 
567
568        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
569        root = dag_config_get_root_component(card_ref);
570       
571        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
572        dag_config_set_boolean_attribute(card_ref, uuid, true);
573
574        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
575        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
576
577        return 0;
578       
579
580} 
581#endif /* HAVE_DAG_CONFIG_API_H */
582
583/* Configures a DAG input trace */
584static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
585                            void *data)
586{
587        switch(option) {
588        case TRACE_OPTION_META_FREQ:
589                /* This option is used to specify the frequency of DUCK
590                 * updates */
591                DUCK.duck_freq = *(int *)data;
592                return 0;
593        case TRACE_OPTION_SNAPLEN:
594#ifdef HAVE_DAG_CONFIG_API_H
595                return dag_csapi_set_snaplen(libtrace, *(int *)data);
596#else
597                /* Tell the card our new snap length */
598        {
599                char conf_str[4096];
600                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
601                if (dag_configure(FORMAT_DATA->device->fd,
602                                  conf_str) != 0) {
603                        trace_set_err(libtrace, errno, "Failed to configure "
604                                      "snaplen on DAG card: %s",
605                                      libtrace->uridata);
606                        return -1;
607                }
608        }
609#endif /* HAVE_DAG_CONFIG_API_H */
610
611                return 0;
612        case TRACE_OPTION_PROMISC:
613                /* DAG already operates in a promisc fashion */
614                return -1;
615        case TRACE_OPTION_FILTER:
616                /* We don't yet support pushing filters into DAG
617                 * cards */
618                return -1;
619        case TRACE_OPTION_EVENT_REALTIME:
620        case TRACE_OPTION_REPLAY_SPEEDUP:
621                /* Live capture is always going to be realtime */
622                return -1;
623        case TRACE_OPTION_HASHER:
624                /* Lets just say we did this, it's currently still up to
625                 * the user to configure this correctly. */
626                return 0;
627        }
628        return -1;
629}
630
631/* Starts a DAG output trace */
632static int dag_start_output(libtrace_out_t *libtrace)
633{
634        struct timeval zero, nopoll;
635
636        zero.tv_sec = 0;
637        zero.tv_usec = 0;
638        nopoll = zero;
639
640        /* Attach and start the DAG stream */
641        if (dag_attach_stream64(FORMAT_DATA_OUT->device->fd,
642                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
643                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
644                return -1;
645        }
646
647        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
648                        FORMAT_DATA_OUT->dagstream) < 0) {
649                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
650                return -1;
651        }
652        FORMAT_DATA_OUT->stream_attached = 1;
653
654        /* We don't want the dag card to do any sleeping */
655        dag_set_stream_poll64(FORMAT_DATA_OUT->device->fd,
656                        FORMAT_DATA_OUT->dagstream, 0, &zero,
657                        &nopoll);
658
659        return 0;
660}
661
662static int dag_start_input_stream(libtrace_t *libtrace,
663                                  struct dag_per_stream_t * stream) {
664        struct timeval zero, nopoll;
665        uint8_t *top, *bottom, *starttop;
666        top = bottom = NULL;
667
668        zero.tv_sec = 0;
669        zero.tv_usec = 10000;
670        nopoll = zero;
671
672        /* Attach and start the DAG stream */
673        if (dag_attach_stream64(FORMAT_DATA->device->fd,
674                              stream->dagstream, 0, 0) < 0) {
675                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
676                              stream->dagstream);
677                return -1;
678        }
679
680        if (dag_start_stream(FORMAT_DATA->device->fd,
681                             stream->dagstream) < 0) {
682                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
683                              stream->dagstream);
684                return -1;
685        }
686        FORMAT_DATA->stream_attached = 1;
687
688        /* We don't want the dag card to do any sleeping */
689        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
690                            stream->dagstream, 0, &zero,
691                            &nopoll) < 0) {
692                trace_set_err(libtrace, errno,
693                              "dag_set_stream_poll failed!");
694                return -1;
695        }
696
697        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
698                                      stream->dagstream,
699                                      &bottom);
700
701        /* Should probably flush the memory hole now */
702        top = starttop;
703        while (starttop - bottom > 0) {
704                bottom += (starttop - bottom);
705                top = dag_advance_stream(FORMAT_DATA->device->fd,
706                                         stream->dagstream,
707                                         &bottom);
708        }
709        stream->top = top;
710        stream->bottom = bottom;
711        stream->processed = 0;
712        stream->drops = 0;
713
714        return 0;
715
716}
717
718/* Starts a DAG input trace */
719static int dag_start_input(libtrace_t *libtrace)
720{
721        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
722}
723
724static int dag_pstart_input(libtrace_t *libtrace)
725{
726        char *scan, *tok;
727        uint16_t stream_count = 0, max_streams;
728        int iserror = 0;
729        struct dag_per_stream_t stream_data;
730
731        /* Check we aren't trying to create more threads than the DAG card can
732         * handle */
733        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
734        if (libtrace->perpkt_thread_count > max_streams) {
735                fprintf(stderr,
736                              "WARNING: DAG has only %u streams available, "
737                              "capping total number of threads at this value.",
738                              max_streams);
739                libtrace->perpkt_thread_count = max_streams;
740        }
741
742        /* Get the stream names from the uri */
743        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
744                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
745                              "Format uri doesn't specify the DAG streams");
746                iserror = 1;
747                goto cleanup;
748        }
749
750        scan++;
751
752        tok = strtok(scan, ",");
753        while (tok != NULL) {
754                /* Ensure we haven't specified too many streams */
755                if (stream_count >= libtrace->perpkt_thread_count) {
756                        fprintf(stderr,
757                                      "WARNING: Format uri specifies too many "
758                                      "streams. Maximum is %u, so only using "
759                                      "the first %u from the uri.",
760                                      libtrace->perpkt_thread_count, 
761                                      libtrace->perpkt_thread_count);
762                        break;
763                }
764
765                /* Save the stream details */
766                if (stream_count == 0) {
767                        /* Special case where we update the existing stream
768                         * data structure */
769                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
770                } else {
771                        memset(&stream_data, 0, sizeof(stream_data));
772                        stream_data.dagstream = (uint16_t)atoi(tok);
773                        libtrace_list_push_back(FORMAT_DATA->per_stream,
774                                                &stream_data);
775                }
776
777                stream_count++;
778                tok = strtok(NULL, ",");
779        }
780
781        if (stream_count < libtrace->perpkt_thread_count) {
782                libtrace->perpkt_thread_count = stream_count;
783        }
784       
785        FORMAT_DATA->stream_attached = 1;
786
787 cleanup:
788        if (iserror) {
789                return -1;
790        } else {
791                return 0;
792        }
793}
794
795/* Pauses a DAG output trace */
796static int dag_pause_output(libtrace_out_t *libtrace)
797{
798        /* Stop and detach the stream */
799        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
800                            FORMAT_DATA_OUT->dagstream) < 0) {
801                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
802                return -1;
803        }
804        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
805                              FORMAT_DATA_OUT->dagstream) < 0) {
806                trace_set_err_out(libtrace, errno,
807                                  "Could not detach DAG stream");
808                return -1;
809        }
810        FORMAT_DATA_OUT->stream_attached = 0;
811        return 0;
812}
813
814/* Pauses a DAG input trace */
815static int dag_pause_input(libtrace_t *libtrace)
816{
817        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
818
819        /* Stop and detach each stream */
820        while (tmp != NULL) {
821                if (dag_stop_stream(FORMAT_DATA->device->fd,
822                                    STREAM_DATA(tmp)->dagstream) < 0) {
823                        trace_set_err(libtrace, errno,
824                                      "Could not stop DAG stream");
825                        return -1;
826                }
827                if (dag_detach_stream(FORMAT_DATA->device->fd,
828                                      STREAM_DATA(tmp)->dagstream) < 0) {
829                        trace_set_err(libtrace, errno,
830                                      "Could not detach DAG stream");
831                        return -1;
832                }
833
834                tmp = tmp->next;
835        }
836
837        FORMAT_DATA->stream_attached = 0;
838        return 0;
839}
840
841
842
843/* Closes a DAG input trace */
844static int dag_fin_input(libtrace_t *libtrace)
845{
846        /* Need the lock, since we're going to be handling the device list */
847        pthread_mutex_lock(&open_dag_mutex);
848
849        /* Detach the stream if we are not paused */
850        if (FORMAT_DATA->stream_attached)
851                dag_pause_input(libtrace);
852        FORMAT_DATA->device->ref_count--;
853
854        /* Close the DAG device if there are no more references to it */
855        if (FORMAT_DATA->device->ref_count == 0)
856                dag_close_device(FORMAT_DATA->device);
857
858        if (DUCK.dummy_duck)
859                trace_destroy_dead(DUCK.dummy_duck);
860
861        /* Clear the list */
862        libtrace_list_deinit(FORMAT_DATA->per_stream);
863        free(libtrace->format_data);
864        pthread_mutex_unlock(&open_dag_mutex);
865        return 0; /* success */
866}
867
868/* Closes a DAG output trace */
869static int dag_fin_output(libtrace_out_t *libtrace)
870{
871
872        /* Commit any outstanding traffic in the txbuffer */
873        if (FORMAT_DATA_OUT->waiting) {
874                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
875                                           FORMAT_DATA_OUT->dagstream,
876                                           FORMAT_DATA_OUT->waiting );
877        }
878
879        /* Wait until the buffer is nearly clear before exiting the program,
880         * as we will lose packets otherwise */
881        dag_tx_get_stream_space64
882                (FORMAT_DATA_OUT->device->fd,
883                 FORMAT_DATA_OUT->dagstream,
884                 dag_get_stream_buffer_size64(FORMAT_DATA_OUT->device->fd,
885                                            FORMAT_DATA_OUT->dagstream) - 8);
886
887        /* Need the lock, since we're going to be handling the device list */
888        pthread_mutex_lock(&open_dag_mutex);
889
890        /* Detach the stream if we are not paused */
891        if (FORMAT_DATA_OUT->stream_attached)
892                dag_pause_output(libtrace);
893        FORMAT_DATA_OUT->device->ref_count --;
894
895        /* Close the DAG device if there are no more references to it */
896        if (FORMAT_DATA_OUT->device->ref_count == 0)
897                dag_close_device(FORMAT_DATA_OUT->device);
898        free(libtrace->format_data);
899        pthread_mutex_unlock(&open_dag_mutex);
900        return 0; /* success */
901}
902
903#ifdef DAGIOC_CARD_DUCK
904#define LIBTRACE_DUCK_IOCTL DAGIOC_CARD_DUCK
905#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_5_0
906#else
907#ifdef DAGIOCDUCK
908#define LIBTRACE_DUCK_IOCTL DAGIOCDUCK
909#define LIBTRACE_DUCK_VERSION TRACE_RT_DUCK_2_5
910#else
911#warning "DAG appears to be missing DUCK support"
912#endif
913#endif
914
915/* Extracts DUCK information from the DAG card and produces a DUCK packet */
916static int dag_get_duckinfo(libtrace_t *libtrace,
917                                libtrace_packet_t *packet) {
918
919        if (DUCK.duck_freq == 0)
920                return 0;
921
922#ifndef LIBTRACE_DUCK_IOCTL
923        trace_set_err(libtrace, errno, 
924                "Requested DUCK information but unable to determine the correct ioctl for DUCK");
925        DUCK.duck_freq = 0;
926        return -1;
927#endif
928
929        if (DUCK.last_pkt - DUCK.last_duck < DUCK.duck_freq)
930                return 0;
931
932        /* Allocate memory for the DUCK data */
933        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
934            !packet->buffer) {
935                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
936                packet->buf_control = TRACE_CTRL_PACKET;
937                if (!packet->buffer) {
938                        trace_set_err(libtrace, errno,
939                                      "Cannot allocate packet buffer");
940                        return -1;
941                }
942        }
943
944        /* DUCK doesn't have a format header */
945        packet->header = 0;
946        packet->payload = packet->buffer;
947
948        /* No need to check if we can get DUCK or not - we're modern
949         * enough so just grab the DUCK info */
950        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
951                   (duckinf_t *)packet->payload) < 0)) {
952                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
953                DUCK.duck_freq = 0;
954                return -1;
955        }
956
957        packet->type = LIBTRACE_DUCK_VERSION;
958
959        /* Set the packet's trace to point at a DUCK trace, so that the
960         * DUCK format functions will be called on the packet rather than the
961         * DAG ones */
962        if (!DUCK.dummy_duck)
963                DUCK.dummy_duck = trace_create_dead("duck:dummy");
964        packet->trace = DUCK.dummy_duck;
965        DUCK.last_duck = DUCK.last_pkt;
966        packet->error = sizeof(duckinf_t);
967        return sizeof(duckinf_t);
968}
969
970/* Determines the amount of data available to read from the DAG card */
971static int dag_available(libtrace_t *libtrace,
972                         struct dag_per_stream_t *stream_data)
973{
974        uint32_t diff = stream_data->top - stream_data->bottom;
975
976        /* If we've processed more than 4MB of data since we last called
977         * dag_advance_stream, then we should call it again to allow the
978         * space occupied by that 4MB to be released */
979        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
980                return diff;
981
982        /* Update the top and bottom pointers */
983        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
984                                              stream_data->dagstream,
985                                              &(stream_data->bottom));
986
987        if (stream_data->top == NULL) {
988                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
989                return -1;
990        }
991        stream_data->processed = 0;
992        diff = stream_data->top - stream_data->bottom;
993        return diff;
994}
995
996/* Returns a pointer to the start of the next complete ERF record */
997static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
998{
999        dag_record_t *erfptr = NULL;
1000        uint16_t size;
1001
1002        erfptr = (dag_record_t *)stream_data->bottom;
1003        if (!erfptr)
1004                return NULL;
1005
1006        size = ntohs(erfptr->rlen);
1007        /*assert( size >= dag_record_size );*/
1008        if (size < dag_record_size) {
1009                fprintf(stderr, "Incorrect dag record size in dag_get_record()\n");
1010                return NULL;
1011        }
1012
1013        /* Make certain we have the full packet available */
1014        if (size > (stream_data->top - stream_data->bottom))
1015                return NULL;
1016
1017        stream_data->bottom += size;
1018        stream_data->processed += size;
1019        return erfptr;
1020}
1021
1022/* Converts a buffer containing a recently read DAG packet record into a
1023 * libtrace packet */
1024static int dag_prepare_packet_stream(libtrace_t *libtrace,
1025                                     struct dag_per_stream_t *stream_data,
1026                                     libtrace_packet_t *packet,
1027                                     void *buffer, libtrace_rt_types_t rt_type,
1028                                     uint32_t flags)
1029{
1030        dag_record_t *erfptr;
1031
1032        /* If the packet previously owned a buffer that is not the buffer
1033         * that contains the new packet data, we're going to need to free the
1034         * old one to avoid memory leaks */
1035        if (packet->buffer != buffer &&
1036            packet->buf_control == TRACE_CTRL_PACKET) {
1037                free(packet->buffer);
1038        }
1039
1040        /* Set the buffer owner appropriately */
1041        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1042                packet->buf_control = TRACE_CTRL_PACKET;
1043        } else
1044                packet->buf_control = TRACE_CTRL_EXTERNAL;
1045
1046        /* Update the packet pointers and type appropriately */
1047        erfptr = (dag_record_t *)buffer;
1048        packet->buffer = erfptr;
1049        packet->header = erfptr;
1050        packet->type = rt_type;
1051
1052        if (erfptr->flags.rxerror == 1) {
1053                /* rxerror means the payload is corrupt - drop the payload
1054                 * by tweaking rlen */
1055                packet->payload = NULL;
1056                erfptr->rlen = htons(erf_get_framing_length(packet));
1057        } else {
1058                packet->payload = (char*)packet->buffer
1059                        + erf_get_framing_length(packet);
1060        }
1061
1062        if (libtrace->format_data == NULL) {
1063                dag_init_format_data(libtrace);
1064        }
1065
1066        /* Update the dropped packets counter */
1067        /* No loss counter for DSM coloured records - have to use some
1068         * other API */
1069        if (erf_is_color_type(erfptr->type)) {
1070                /* TODO */
1071        } else {
1072                /* Use the ERF loss counter */
1073                if (stream_data->seeninterface[erfptr->flags.iface]
1074                    == 0) {
1075                        stream_data->seeninterface[erfptr->flags.iface]
1076                                = 1;
1077                } else {
1078                        stream_data->drops += ntohs(erfptr->lctr);
1079                }
1080        }
1081
1082        return 0;
1083}
1084
1085static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
1086                              void *buffer, libtrace_rt_types_t rt_type,
1087                              uint32_t flags)
1088{
1089        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1090                                       buffer, rt_type, flags);
1091}
1092
1093/*
1094 * dag_write_packet() at this stage attempts to improve tx performance
1095 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
1096 * has been met. I observed approximately 270% performance increase
1097 * through this relatively naive tweak. No optimisation of buffer sizes
1098 * was attempted.
1099 */
1100
1101/* Pushes an ERF record onto the transmit stream */
1102static int dag_dump_packet(libtrace_out_t *libtrace,
1103                           dag_record_t *erfptr, unsigned int pad,
1104                           void *buffer)
1105{
1106        int size;
1107
1108        /*
1109         * If we've got 0 bytes waiting in the txqueue, assume that we
1110         * haven't requested any space yet, and request some, storing
1111         * the pointer at FORMAT_DATA_OUT->txbuffer.
1112         *
1113         * The amount to request is slightly magical at the moment - it's
1114         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
1115         * the buffer and handle overruns.
1116         */
1117        if (FORMAT_DATA_OUT->waiting == 0) {
1118                FORMAT_DATA_OUT->txbuffer =
1119                        dag_tx_get_stream_space64(FORMAT_DATA_OUT->device->fd,
1120                                                FORMAT_DATA_OUT->dagstream,
1121                                                16908288);
1122        }
1123
1124        /*
1125         * Copy the header separately to the body, as we can't guarantee they
1126         * are in contiguous memory
1127         */
1128        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
1129               (dag_record_size + pad));
1130        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
1131
1132        /*
1133         * Copy our incoming packet into the outgoing buffer, and increment
1134         * our waiting count
1135         */
1136        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
1137        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
1138               size);
1139        FORMAT_DATA_OUT->waiting += size;
1140
1141        /*
1142         * If our output buffer has more than 16 Mebibytes in it, commit those
1143         * bytes and reset the waiting count to 0.
1144         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
1145         * case there is still data in the buffer at program exit.
1146         */
1147        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
1148                FORMAT_DATA_OUT->txbuffer =
1149                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
1150                                                   FORMAT_DATA_OUT->dagstream,
1151                                                   FORMAT_DATA_OUT->waiting);
1152                FORMAT_DATA_OUT->waiting = 0;
1153        }
1154
1155        return size + pad + dag_record_size;
1156}
1157
1158/* Attempts to determine a suitable ERF type for a given packet. Returns true
1159 * if one is found, false otherwise */
1160static bool find_compatible_linktype(libtrace_out_t *libtrace,
1161                                     libtrace_packet_t *packet, char *type)
1162{
1163        /* Keep trying to simplify the packet until we can find
1164         * something we can do with it */
1165
1166        do {
1167                *type = libtrace_to_erf_type(trace_get_link_type(packet));
1168
1169                /* Success */
1170                if (*type != (char)-1)
1171                        return true;
1172
1173                if (!demote_packet(packet)) {
1174                        trace_set_err_out(libtrace,
1175                                          TRACE_ERR_NO_CONVERSION,
1176                                          "No erf type for packet (%i)",
1177                                          trace_get_link_type(packet));
1178                        return false;
1179                }
1180
1181        } while(1);
1182
1183        return true;
1184}
1185
1186/* Writes a packet to the provided DAG output trace */
1187static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
1188{
1189        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
1190         * coding sucks, sorry about that.
1191         */
1192        unsigned int pad = 0;
1193        int numbytes;
1194        void *payload = packet->payload;
1195        dag_record_t *header = (dag_record_t *)packet->header;
1196        char erf_type = 0;
1197
1198        if(!packet->header) {
1199                /* No header, probably an RT packet. Lifted from
1200                 * erf_write_packet(). */
1201                return -1;
1202        }
1203
1204        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
1205                return 0;
1206
1207        pad = dag_get_padding(packet);
1208
1209        /*
1210         * If the payload is null, adjust the rlen. Discussion of this is
1211         * attached to erf_write_packet()
1212         */
1213        if (payload == NULL) {
1214                header->rlen = htons(dag_record_size + pad);
1215        }
1216
1217        if (packet->type == TRACE_RT_DATA_ERF) {
1218                numbytes = dag_dump_packet(libtrace, header, pad, payload);
1219        } else {
1220                /* Build up a new packet header from the existing header */
1221
1222                /* Simplify the packet first - if we can't do this, break
1223                 * early */
1224                if (!find_compatible_linktype(libtrace,packet,&erf_type))
1225                        return -1;
1226
1227                dag_record_t erfhdr;
1228
1229                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
1230                payload=packet->payload;
1231                pad = dag_get_padding(packet);
1232
1233                /* Flags. Can't do this */
1234                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
1235                if (trace_get_direction(packet)!=(int)~0U)
1236                        erfhdr.flags.iface = trace_get_direction(packet);
1237
1238                erfhdr.type = erf_type;
1239
1240                /* Packet length (rlen includes format overhead) */
1241                /*assert(trace_get_capture_length(packet) > 0
1242                       && trace_get_capture_length(packet) <= 65536);*/
1243                if (!(trace_get_capture_length(packet) > 0
1244                       && trace_get_capture_length(packet) <= 65536)) {
1245                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1246                                "Capture length is out of range in dag_write_packet()");
1247                        return -1;
1248                }
1249                /*assert(erf_get_framing_length(packet) > 0
1250                       && trace_get_framing_length(packet) <= 65536);*/
1251                if (!(erf_get_framing_length(packet) > 0
1252                       && trace_get_framing_length(packet) <= 65536)) {
1253                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1254                                "Framing length is out of range in dag_write_packet()");
1255                        return -1;
1256                }
1257                /*assert(trace_get_capture_length(packet) +
1258                       erf_get_framing_length(packet) > 0
1259                       && trace_get_capture_length(packet) +
1260                       erf_get_framing_length(packet) <= 65536);*/
1261                if (!(trace_get_capture_length(packet) +
1262                       erf_get_framing_length(packet) > 0
1263                       && trace_get_capture_length(packet) +
1264                       erf_get_framing_length(packet) <= 65536)) {
1265                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1266                                "Capture + framing length is out of range in dag_write_packet()");
1267                        return -1;
1268                }
1269
1270                erfhdr.rlen = htons(trace_get_capture_length(packet)
1271                                    + erf_get_framing_length(packet));
1272
1273
1274                /* Loss counter. Can't do this */
1275                erfhdr.lctr = 0;
1276                /* Wire length, does not include padding! */
1277                erfhdr.wlen = htons(trace_get_wire_length(packet));
1278
1279                /* Write it out */
1280                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
1281        }
1282
1283        return numbytes;
1284}
1285
1286/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1287 *
1288 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1289 */
1290static int dag_read_packet_stream(libtrace_t *libtrace,
1291                                struct dag_per_stream_t *stream_data,
1292                                libtrace_thread_t *t, /* Optional */
1293                                libtrace_packet_t *packet)
1294{
1295        int size = 0;
1296        dag_record_t *erfptr = NULL;
1297        struct timeval tv;
1298        int numbytes = 0;
1299        uint32_t flags = 0;
1300        struct timeval maxwait, pollwait;
1301
1302        pollwait.tv_sec = 0;
1303        pollwait.tv_usec = 10000;
1304        maxwait.tv_sec = 0;
1305        maxwait.tv_usec = 250000;
1306
1307        /* Check if we're due for a DUCK report - only report on the first thread */
1308        if (stream_data == FORMAT_DATA_FIRST) {
1309                size = dag_get_duckinfo(libtrace, packet);
1310                if (size != 0)
1311                        return size;
1312        }
1313
1314
1315        /* Don't let anyone try to free our DAG memory hole! */
1316        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1317
1318        /* If the packet buffer is currently owned by libtrace, free it so
1319         * that we can set the packet to point into the DAG memory hole */
1320        if (packet->buf_control == TRACE_CTRL_PACKET) {
1321                free(packet->buffer);
1322                packet->buffer = 0;
1323        }
1324
1325        if (dag_set_stream_poll64(FORMAT_DATA->device->fd, stream_data->dagstream,
1326                                sizeof(dag_record_t), &maxwait,
1327                                &pollwait) == -1) {
1328                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1329                return -1;
1330        }
1331
1332        /* Grab a full ERF record */
1333        do {
1334                numbytes = dag_available(libtrace, stream_data);
1335                if (numbytes < 0)
1336                        return numbytes;
1337                if (numbytes < dag_record_size) {
1338                        /* Check the message queue if we have one to check */
1339                        if (t != NULL &&
1340                            libtrace_message_queue_count(&t->messages) > 0)
1341                                return -2;
1342
1343                        if ((numbytes=is_halted(libtrace)) != -1)
1344                                return numbytes;
1345                        /* Block until we see a packet */
1346                        continue;
1347                }
1348                erfptr = dag_get_record(stream_data);
1349        } while (erfptr == NULL);
1350
1351        packet->trace = libtrace;
1352
1353        /* Prepare the libtrace packet */
1354        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
1355                                    TRACE_RT_DATA_ERF, flags))
1356                return -1;
1357
1358        /* Update the DUCK timer - don't re-order this check (false-sharing) */
1359        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
1360                tv = trace_get_timeval(packet);
1361                DUCK.last_pkt = tv.tv_sec;
1362        }
1363
1364        packet->order = erf_get_erf_timestamp(packet);
1365        packet->error = packet->payload ? htons(erfptr->rlen) :
1366                                          erf_get_framing_length(packet);
1367
1368        return packet->error;
1369}
1370
1371static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
1372{
1373        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
1374}
1375
1376static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1377                             libtrace_packet_t **packets, size_t nb_packets)
1378{
1379        int ret;
1380        size_t read_packets = 0;
1381        int numbytes = 0;
1382
1383        struct dag_per_stream_t *stream_data =
1384                (struct dag_per_stream_t *)t->format_data;
1385
1386        /* Read as many packets as we can, but read atleast one packet */
1387        do {
1388                ret = dag_read_packet_stream(libtrace, stream_data, t,
1389                                           packets[read_packets]);
1390                if (ret < 0)
1391                        return ret;
1392
1393                read_packets++;
1394
1395                /* Make sure we don't read too many packets..! */
1396                if (read_packets >= nb_packets)
1397                        break;
1398
1399                numbytes = dag_available(libtrace, stream_data);
1400        } while (numbytes >= dag_record_size);
1401
1402        return read_packets;
1403}
1404
1405/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1406 * packet is available, we will return a packet event. Otherwise we will
1407 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1408 */
1409static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
1410                                           libtrace_packet_t *packet)
1411{
1412        libtrace_eventobj_t event = {0,0,0.0,0};
1413        dag_record_t *erfptr = NULL;
1414        int numbytes;
1415        uint32_t flags = 0;
1416        struct timeval minwait, tv;
1417       
1418        minwait.tv_sec = 0;
1419        minwait.tv_usec = 10000;
1420
1421        /* Check if we're meant to provide a DUCK update */
1422        numbytes = dag_get_duckinfo(libtrace, packet);
1423        if (numbytes < 0) {
1424                event.type = TRACE_EVENT_TERMINATE;
1425                return event;
1426        } else if (numbytes > 0) {
1427                event.type = TRACE_EVENT_PACKET;
1428                return event;
1429        }
1430       
1431        if (dag_set_stream_poll64(FORMAT_DATA->device->fd,
1432                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
1433                                &minwait) == -1) {
1434                trace_set_err(libtrace, errno, "dag_set_stream_poll");
1435                event.type = TRACE_EVENT_TERMINATE;
1436                return event;
1437        }
1438
1439        do {
1440                erfptr = NULL;
1441                numbytes = 0;
1442
1443                /* Need to call dag_available so that the top pointer will get
1444                 * updated, otherwise we'll never see any data! */
1445                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
1446
1447                /* May as well not bother calling dag_get_record if
1448                 * dag_available suggests that there's no data */
1449                if (numbytes != 0)
1450                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
1451                if (erfptr == NULL) {
1452                        /* No packet available - sleep for a very short time */
1453                        if (libtrace_halt) {
1454                                event.type = TRACE_EVENT_TERMINATE;
1455                        } else {
1456                                event.type = TRACE_EVENT_SLEEP;
1457                                event.seconds = 0.0001;
1458                        }
1459                        break;
1460                }
1461                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
1462                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
1463                        event.type = TRACE_EVENT_TERMINATE;
1464                        break;
1465                }
1466
1467
1468                event.size = trace_get_capture_length(packet) +
1469                        trace_get_framing_length(packet);
1470
1471                /* XXX trace_read_packet() normally applies the following
1472                 * config options for us, but this function is called via
1473                 * trace_event() so we have to do it ourselves */
1474
1475                if (libtrace->filter) {
1476                        int filtret = trace_apply_filter(libtrace->filter,
1477                                                         packet);
1478                        if (filtret == -1) {
1479                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
1480                                              "Bad BPF Filter");
1481                                event.type = TRACE_EVENT_TERMINATE;
1482                                break;
1483                        }
1484
1485                        if (filtret == 0) {
1486                                /* This packet isn't useful so we want to
1487                                 * immediately see if there is another suitable
1488                                 * one - we definitely DO NOT want to return
1489                                 * a sleep event in this case, like we used to
1490                                 * do! */
1491                                libtrace->filtered_packets ++;
1492                                trace_clear_cache(packet);
1493                                continue;
1494                        }
1495
1496                        event.type = TRACE_EVENT_PACKET;
1497                } else {
1498                        event.type = TRACE_EVENT_PACKET;
1499                }
1500
1501                /* Update the DUCK timer */
1502                tv = trace_get_timeval(packet);
1503                DUCK.last_pkt = tv.tv_sec;
1504               
1505                if (libtrace->snaplen > 0) {
1506                        trace_set_capture_length(packet, libtrace->snaplen);
1507                }
1508                libtrace->accepted_packets ++;
1509                break;
1510        } while(1);
1511
1512        return event;
1513}
1514
1515static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
1516{
1517        libtrace_list_node_t *tmp;
1518        /*assert(stat && libtrace);*/
1519        if (!libtrace) {
1520                fprintf(stderr, "NULL trace passed into dag_get_statistics()\n");
1521                return;
1522        }
1523        if (!stat) {
1524                trace_set_err, TRACE_ERR_STAT, "NULL stat passed into dag_get_statistics()");
1525                return;
1526        }
1527        tmp = FORMAT_DATA_HEAD;
1528
1529        /* Dropped packets */
1530        stat->dropped_valid = 1;
1531        stat->dropped = 0;
1532        while (tmp != NULL) {
1533                stat->dropped += STREAM_DATA(tmp)->drops;
1534                tmp = tmp->next;
1535        }
1536
1537}
1538
1539static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
1540                                       libtrace_stat_t *stat) {
1541        struct dag_per_stream_t *stream_data = t->format_data;
1542        /*assert(stat && libtrace);*/
1543        if (!libtrace) {
1544                fprintf(stderr, "NULL trace passed into dag_get_thread_statistics()\n");
1545                return;
1546        }
1547        if (!stat) {
1548                trace_set_err, TRACE_ERR_STAT, "NULL stat passed into dag_get_thread_statistics()");
1549                return;
1550        }
1551        stat->dropped_valid = 1;
1552        stat->dropped = stream_data->drops;
1553
1554        stat->filtered_valid = 1;
1555        stat->filtered = 0;
1556}
1557
1558/* Prints some semi-useful help text about the DAG format module */
1559static void dag_help(void) {
1560        printf("dag format module: $Revision: 1755 $\n");
1561        printf("Supported input URIs:\n");
1562        printf("\tdag:/dev/dagn\n");
1563        printf("\n");
1564        printf("\te.g.: dag:/dev/dag0\n");
1565        printf("\n");
1566        printf("Supported output URIs:\n");
1567        printf("\tnone\n");
1568        printf("\n");
1569}
1570
1571static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1572                                bool reader)
1573{
1574        struct dag_per_stream_t *stream_data;
1575        libtrace_list_node_t *node;
1576
1577        if (reader && t->type == THREAD_PERPKT) {
1578                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
1579                                                t->perpkt_num);
1580                if (node == NULL) {
1581                        return -1;
1582                }
1583                stream_data = node->data;
1584
1585                /* Pass the per thread data to the thread */
1586                t->format_data = stream_data;
1587
1588                /* Attach and start the DAG stream */
1589                if (dag_start_input_stream(libtrace, stream_data) < 0)
1590                        return -1;
1591        }
1592
1593        return 0;
1594}
1595
1596static struct libtrace_format_t dag = {
1597        "dag",
1598        "$Id$",
1599        TRACE_FORMAT_ERF,
1600        dag_probe_filename,             /* probe filename */
1601        NULL,                           /* probe magic */
1602        dag_init_input,                 /* init_input */
1603        dag_config_input,               /* config_input */
1604        dag_start_input,                /* start_input */
1605        dag_pause_input,                /* pause_input */
1606        dag_init_output,                /* init_output */
1607        NULL,                           /* config_output */
1608        dag_start_output,               /* start_output */
1609        dag_fin_input,                  /* fin_input */
1610        dag_fin_output,                 /* fin_output */
1611        dag_read_packet,                /* read_packet */
1612        dag_prepare_packet,             /* prepare_packet */
1613        NULL,                           /* fin_packet */
1614        dag_write_packet,               /* write_packet */
1615        NULL,                           /* flush_output */
1616        erf_get_link_type,              /* get_link_type */
1617        erf_get_direction,              /* get_direction */
1618        erf_set_direction,              /* set_direction */
1619        erf_get_erf_timestamp,          /* get_erf_timestamp */
1620        NULL,                           /* get_timeval */
1621        NULL,                           /* get_seconds */
1622        NULL,                           /* get_timespec */
1623        NULL,                           /* seek_erf */
1624        NULL,                           /* seek_timeval */
1625        NULL,                           /* seek_seconds */
1626        erf_get_capture_length,         /* get_capture_length */
1627        erf_get_wire_length,            /* get_wire_length */
1628        erf_get_framing_length,         /* get_framing_length */
1629        erf_set_capture_length,         /* set_capture_length */
1630        NULL,                           /* get_received_packets */
1631        NULL,                           /* get_filtered_packets */
1632        NULL,                           /* get_dropped_packets */
1633        dag_get_statistics,             /* get_statistics */
1634        NULL,                           /* get_fd */
1635        trace_event_dag,                /* trace_event */
1636        dag_help,                       /* help */
1637        NULL,                            /* next pointer */
1638        {true, 0}, /* live packet capture, thread limit TBD */
1639        dag_pstart_input,
1640        dag_pread_packets,
1641        dag_pause_input,
1642        NULL,
1643        dag_pregister_thread,
1644        NULL,
1645        dag_get_thread_statistics       /* get thread stats */
1646};
1647
1648void dag_constructor(void)
1649{
1650        register_format(&dag);
1651}
Note: See TracBrowser for help on using the repository browser.