source: lib/format_dag25.c @ f051c1b

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since f051c1b was b13b939, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Adds a configuration option for the tick messages.
Adds the trace_information structure which contains information about traces.
Updates trace_rt_stats to use both of these.

Replaced libtrace_t->joined internally with a state

  • Property mode set to 100644
File size: 37.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49#include <sys/stat.h>
50
51#include <sys/mman.h>
52/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
53 * Windows anyway so we'll worry about this more later :] */
54#include <pthread.h>
55
56
57#ifdef WIN32
58#  include <io.h>
59#  include <share.h>
60#  define PATH_MAX _MAX_PATH
61#  define snprintf sprintf_s
62#else
63#  include <netdb.h>
64#  ifndef PATH_MAX
65#       define PATH_MAX 4096
66#  endif
67#  include <sys/ioctl.h>
68#endif
69
70/* This format deals with DAG cards that are using drivers from the 2.5 version
71 * onwards, including 3.X.
72 *
73 * DAG is a LIVE capture format.
74 *
75 * This format does support writing, provided the DAG card that you are using
76 * has transmit (Tx) support. Additionally, packets read using this format
77 * are in the ERF format, so can easily be written as ERF traces without
78 * losing any data.
79 */
80
81
82#define DATA(x) ((struct dag_format_data_t *)x->format_data)
83#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
84
85#define FORMAT_DATA DATA(libtrace)
86#define FORMAT_DATA_OUT DATA_OUT(libtrace)
87
88#define DUCK FORMAT_DATA->duck
89static struct libtrace_format_t dag;
90
91/* A DAG device - a DAG device can support multiple streams (and therefore
92 * multiple input traces) so each trace needs to refer to a device */
93struct dag_dev_t {
94        char * dev_name;                /* Device name */
95        int fd;                         /* File descriptor */
96        uint16_t ref_count;             /* Number of input / output traces
97                                           that are using this device */
98        struct dag_dev_t *prev;         /* Pointer to the previous device in
99                                           the device list */
100        struct dag_dev_t *next;         /* Pointer to the next device in the
101                                           device list */
102};
103
104/* "Global" data that is stored for each DAG output trace */
105struct dag_format_data_out_t {
106        /* The DAG device being used for writing */
107        struct dag_dev_t *device;
108        /* The DAG stream that is being written on */
109        unsigned int dagstream;
110        /* Boolean flag indicating whether the stream is currently attached */
111        int stream_attached;
112        /* The amount of data waiting to be transmitted, in bytes */
113        uint64_t waiting;
114        /* A buffer to hold the data to be transmittted */
115        uint8_t *txbuffer;
116};
117
118/* "Global" data that is stored for each DAG input trace */
119struct dag_format_data_t {
120
121        /* Data required for regular DUCK reporting */
122        struct {
123                /* Timestamp of the last DUCK report */
124                uint32_t last_duck;
125                /* The number of seconds between each DUCK report */
126                uint32_t duck_freq;
127                /* Timestamp of the last packet read from the DAG card */
128                uint32_t last_pkt;
129                /* Dummy trace to ensure DUCK packets are dealt with using the
130                 * DUCK format functions */
131                libtrace_t *dummy_duck;
132        } duck;
133
134        /* The DAG device that we are reading from */
135        struct dag_dev_t *device;
136        /* The DAG stream that we are reading from */
137        unsigned int dagstream;
138        /* Boolean flag indicating whether the stream is currently attached */
139        int stream_attached;
140        /* Pointer to the first unread byte in the DAG memory hole */
141        uint8_t *bottom;
142        /* Pointer to the last unread byte in the DAG memory hole */
143        uint8_t *top;
144        /* The amount of data processed thus far from the bottom pointer */
145        uint32_t processed;
146        /* The number of packets that have been dropped */
147        uint64_t drops;
148};
149
150/* To be thread-safe, we're going to need a mutex for operating on the list
151 * of DAG devices */
152pthread_mutex_t open_dag_mutex;
153
154/* The list of DAG devices that have been opened by libtrace.
155 *
156 * We can only open each DAG device once, but we might want to read from
157 * multiple streams. Therefore, we need to maintain a list of devices that we
158 * have opened (with ref counts!) so that we don't try to open a device too
159 * many times or close a device that we're still using */
160struct dag_dev_t *open_dags = NULL;
161
162/* Returns the amount of padding between the ERF header and the start of the
163 * captured packet data */
164static int dag_get_padding(const libtrace_packet_t *packet)
165{
166        /* ERF Ethernet records have a 2 byte padding before the packet itself
167         * so that the IP header is aligned on a 32 bit boundary.
168         */
169        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
170                dag_record_t *erfptr = (dag_record_t *)packet->header;
171                switch(erfptr->type) {
172                        case TYPE_ETH:
173                        case TYPE_DSM_COLOR_ETH:
174                                return 2;
175                        default:                return 0;
176                }
177        }
178        else {
179                switch(trace_get_link_type(packet)) {
180                        case TRACE_TYPE_ETH:    return 2;
181                        default:                return 0;
182                }
183        }
184}
185
186/* Attempts to determine if the given filename refers to a DAG device */
187static int dag_probe_filename(const char *filename)
188{
189        struct stat statbuf;
190        /* Can we stat the file? */
191        if (stat(filename, &statbuf) != 0) {
192                return 0;
193        }
194        /* Is it a character device? */
195        if (!S_ISCHR(statbuf.st_mode)) {
196                return 0;
197        }
198        /* Yeah, it's probably us. */
199        return 1;
200}
201
202/* Initialises the DAG output data structure */
203static void dag_init_format_out_data(libtrace_out_t *libtrace) {
204        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
205        // no DUCK on output
206        FORMAT_DATA_OUT->stream_attached = 0;
207        FORMAT_DATA_OUT->device = NULL;
208        FORMAT_DATA_OUT->dagstream = 0;
209        FORMAT_DATA_OUT->waiting = 0;
210
211}
212
213/* Initialises the DAG input data structure */
214static void dag_init_format_data(libtrace_t *libtrace) {
215        libtrace->format_data = (struct dag_format_data_t *)
216                malloc(sizeof(struct dag_format_data_t));
217        DUCK.last_duck = 0;
218        DUCK.duck_freq = 0;
219        DUCK.last_pkt = 0;
220        DUCK.dummy_duck = NULL;
221        FORMAT_DATA->stream_attached = 0;
222        FORMAT_DATA->drops = 0;
223        FORMAT_DATA->device = NULL;
224        FORMAT_DATA->dagstream = 0;
225        FORMAT_DATA->processed = 0;
226        FORMAT_DATA->bottom = NULL;
227        FORMAT_DATA->top = NULL;
228}
229
230/* Determines if there is already an entry for the given DAG device in the
231 * device list and increments the reference count for that device, if found.
232 *
233 * NOTE: This function assumes the open_dag_mutex is held by the caller */
234static struct dag_dev_t *dag_find_open_device(char *dev_name) {
235        struct dag_dev_t *dag_dev;
236
237        dag_dev = open_dags;
238
239        /* XXX: Not exactly zippy, but how often are we going to be dealing
240         * with multiple dag cards? */
241        while (dag_dev != NULL) {
242                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
243                        dag_dev->ref_count ++;
244                        return dag_dev;
245
246                }
247                dag_dev = dag_dev->next;
248        }
249        return NULL;
250
251
252}
253
254/* Closes a DAG device and removes it from the device list.
255 *
256 * Attempting to close a DAG device that has a non-zero reference count will
257 * cause an assertion failure!
258 *
259 * NOTE: This function assumes the open_dag_mutex is held by the caller */
260static void dag_close_device(struct dag_dev_t *dev) {
261        /* Need to remove from the device list */
262
263        assert(dev->ref_count == 0);
264
265        if (dev->prev == NULL) {
266                open_dags = dev->next;
267                if (dev->next)
268                        dev->next->prev = NULL;
269        } else {
270                dev->prev->next = dev->next;
271                if (dev->next)
272                        dev->next->prev = dev->prev;
273        }
274
275        dag_close(dev->fd);
276        if (dev->dev_name)
277        free(dev->dev_name);
278        free(dev);
279}
280
281
282/* Opens a new DAG device for writing and adds it to the DAG device list
283 *
284 * NOTE: this function should only be called when opening a DAG device for
285 * writing - there is little practical difference between this and the
286 * function below that covers the reading case, but we need the output trace
287 * object to report errors properly so the two functions take slightly
288 * different arguments. This is really lame and there should be a much better
289 * way of doing this.
290 *
291 * NOTE: This function assumes the open_dag_mutex is held by the caller
292 */
293static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
294        struct stat buf;
295        int fd;
296        struct dag_dev_t *new_dev;
297
298        /* Make sure the device exists */
299        if (stat(dev_name, &buf) == -1) {
300                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
301                return NULL;
302}
303
304        /* Make sure it is the appropriate type of device */
305        if (S_ISCHR(buf.st_mode)) {
306                /* Try opening the DAG device */
307                if((fd = dag_open(dev_name)) < 0) {
308                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
309                                        dev_name);
310                        return NULL;
311                }
312        } else {
313                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
314                                dev_name);
315                return NULL;
316        }
317
318        /* Add the device to our device list - it is just a doubly linked
319         * list with no inherent ordering; just tack the new one on the front
320         */
321        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
322        new_dev->fd = fd;
323        new_dev->dev_name = dev_name;
324        new_dev->ref_count = 1;
325
326        new_dev->prev = NULL;
327        new_dev->next = open_dags;
328        if (open_dags)
329                open_dags->prev = new_dev;
330
331        open_dags = new_dev;
332
333        return new_dev;
334}
335
336/* Opens a new DAG device for reading and adds it to the DAG device list
337 *
338 * NOTE: this function should only be called when opening a DAG device for
339 * reading - there is little practical difference between this and the
340 * function above that covers the writing case, but we need the input trace
341 * object to report errors properly so the two functions take slightly
342 * different arguments. This is really lame and there should be a much better
343 * way of doing this.
344 *
345 * NOTE: This function assumes the open_dag_mutex is held by the caller */
346static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
347        struct stat buf;
348        int fd;
349        struct dag_dev_t *new_dev;
350
351        /* Make sure the device exists */
352        if (stat(dev_name, &buf) == -1) {
353                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
354                return NULL;
355        }
356
357        /* Make sure it is the appropriate type of device */
358        if (S_ISCHR(buf.st_mode)) {
359                /* Try opening the DAG device */
360                if((fd = dag_open(dev_name)) < 0) {
361                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
362                                        dev_name);
363                        return NULL;
364                }
365        } else {
366                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
367                                dev_name);
368                return NULL;
369        }
370
371        /* Add the device to our device list - it is just a doubly linked
372         * list with no inherent ordering; just tack the new one on the front
373         */
374        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
375        new_dev->fd = fd;
376        new_dev->dev_name = dev_name;
377        new_dev->ref_count = 1;
378
379        new_dev->prev = NULL;
380        new_dev->next = open_dags;
381        if (open_dags)
382                open_dags->prev = new_dev;
383
384        open_dags = new_dev;
385
386        return new_dev;
387}
388
389/* Creates and initialises a DAG output trace */
390static int dag_init_output(libtrace_out_t *libtrace) {
391        char *dag_dev_name = NULL;
392        char *scan = NULL;
393        struct dag_dev_t *dag_device = NULL;
394        int stream = 1;
395       
396        /* XXX I don't know if this is important or not, but this function
397         * isn't present in all of the driver releases that this code is
398         * supposed to support! */
399        /*
400        unsigned long wake_time;
401        dagutil_sleep_get_wake_time(&wake_time,0);
402        */
403
404        dag_init_format_out_data(libtrace);
405        /* Grab the mutex while we're likely to be messing with the device
406         * list */
407        pthread_mutex_lock(&open_dag_mutex);
408       
409        /* Specific streams are signified using a comma in the libtrace URI,
410         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
411         *
412         * If no stream is specified, we will write using stream 1 */
413        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
414                dag_dev_name = strdup(libtrace->uridata);
415        } else {
416                dag_dev_name = (char *)strndup(libtrace->uridata,
417                                (size_t)(scan - libtrace->uridata));
418                stream = atoi(++scan);
419        }
420        FORMAT_DATA_OUT->dagstream = stream;
421
422        /* See if our DAG device is already open */
423        dag_device = dag_find_open_device(dag_dev_name);
424
425        if (dag_device == NULL) {
426                /* Device not yet opened - open it ourselves */
427                dag_device = dag_open_output_device(libtrace, dag_dev_name);
428        } else {
429                /* Otherwise, just use the existing one */
430                free(dag_dev_name);
431                dag_dev_name = NULL;
432        }
433
434        /* Make sure we have successfully opened a DAG device */
435        if (dag_device == NULL) {
436                if (dag_dev_name) {
437                        free(dag_dev_name);
438                }
439                pthread_mutex_unlock(&open_dag_mutex);
440                return -1;
441        }
442
443        FORMAT_DATA_OUT->device = dag_device;
444        pthread_mutex_unlock(&open_dag_mutex);
445        return 0;
446}
447
448/* Creates and initialises a DAG input trace */
449static int dag_init_input(libtrace_t *libtrace) {
450        char *dag_dev_name = NULL;
451        char *scan = NULL;
452        int stream = 0;
453        struct dag_dev_t *dag_device = NULL;
454
455        dag_init_format_data(libtrace);
456        /* Grab the mutex while we're likely to be messing with the device
457         * list */
458        pthread_mutex_lock(&open_dag_mutex);
459       
460       
461        /* Specific streams are signified using a comma in the libtrace URI,
462         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
463         *
464         * If no stream is specified, we will read from stream 0 */
465        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
466                dag_dev_name = strdup(libtrace->uridata);
467        } else {
468                dag_dev_name = (char *)strndup(libtrace->uridata,
469                                (size_t)(scan - libtrace->uridata));
470                stream = atoi(++scan);
471        }
472
473        FORMAT_DATA->dagstream = stream;
474
475        /* See if our DAG device is already open */
476        dag_device = dag_find_open_device(dag_dev_name);
477
478        if (dag_device == NULL) {
479                /* Device not yet opened - open it ourselves */
480                dag_device = dag_open_device(libtrace, dag_dev_name);
481        } else {
482                /* Otherwise, just use the existing one */
483                free(dag_dev_name);
484                dag_dev_name = NULL;
485        }
486
487        /* Make sure we have successfully opened a DAG device */
488        if (dag_device == NULL) {
489                if (dag_dev_name)
490                        free(dag_dev_name);
491                dag_dev_name = NULL;
492                pthread_mutex_unlock(&open_dag_mutex);
493                return -1;
494        }
495
496        FORMAT_DATA->device = dag_device;
497
498        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
499        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
500 
501        *  The symptom of the port being disabled is that libtrace will appear to hang.
502        */
503        /* Check kBooleanAttributeFault is false */
504        /* Check kBooleanAttributeLocalFault is false */
505        /* Check kBooleanAttributeLock is true ? */
506        /* Check kBooleanAttributePeerLink ? */
507
508        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
509        /* Set kUint32AttributeSnapLength to the snaplength */
510
511        pthread_mutex_unlock(&open_dag_mutex);
512        return 0;
513}
514
515/* Configures a DAG input trace */
516static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
517                                void *data) {
518        char conf_str[4096];
519        switch(option) {
520                case TRACE_OPTION_META_FREQ:
521                        /* This option is used to specify the frequency of DUCK
522                         * updates */
523                        DUCK.duck_freq = *(int *)data;
524                        return 0;
525                case TRACE_OPTION_SNAPLEN:
526                        /* Tell the card our new snap length */
527                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
528                        if (dag_configure(FORMAT_DATA->device->fd,
529                                                conf_str) != 0) {
530                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
531                                return -1;
532                        }
533                        return 0;
534                case TRACE_OPTION_PROMISC:
535                        /* DAG already operates in a promisc fashion */
536                        return -1;
537                case TRACE_OPTION_FILTER:
538                        /* We don't yet support pushing filters into DAG
539                         * cards */
540                        return -1;
541                case TRACE_OPTION_EVENT_REALTIME:
542                        /* Live capture is always going to be realtime */
543                        return -1;
544        }
545        return -1;
546}
547
548/* Starts a DAG output trace */
549static int dag_start_output(libtrace_out_t *libtrace) {
550        struct timeval zero, nopoll;
551
552        zero.tv_sec = 0;
553        zero.tv_usec = 0;
554        nopoll = zero;
555
556        /* Attach and start the DAG stream */
557
558        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
559                        FORMAT_DATA_OUT->dagstream, 0, 1048576) < 0) {
560                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
561                return -1;
562        }
563
564        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
565                        FORMAT_DATA_OUT->dagstream) < 0) {
566                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
567                return -1;
568        }
569        FORMAT_DATA_OUT->stream_attached = 1;
570
571        /* We don't want the dag card to do any sleeping */
572
573        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
574                        FORMAT_DATA_OUT->dagstream, 0, &zero,
575                        &nopoll);
576
577        return 0;
578}
579
580/* Starts a DAG input trace */
581static int dag_start_input(libtrace_t *libtrace) {
582        struct timeval zero, nopoll;
583        uint8_t *top, *bottom;
584        uint8_t diff = 0;
585        top = bottom = NULL;
586
587        zero.tv_sec = 0;
588        zero.tv_usec = 0;
589        nopoll = zero;
590
591        /* Attach and start the DAG stream */
592        if (dag_attach_stream(FORMAT_DATA->device->fd,
593                                FORMAT_DATA->dagstream, 0, 0) < 0) {
594                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
595                return -1;
596        }
597
598        if (dag_start_stream(FORMAT_DATA->device->fd,
599                                FORMAT_DATA->dagstream) < 0) {
600                trace_set_err(libtrace, errno, "Cannot start DAG stream");
601                return -1;
602        }
603        FORMAT_DATA->stream_attached = 1;
604       
605        /* We don't want the dag card to do any sleeping */
606        dag_set_stream_poll(FORMAT_DATA->device->fd,
607                                FORMAT_DATA->dagstream, 0, &zero,
608                                &nopoll);
609
610        /* Should probably flush the memory hole now */
611        do {
612                top = dag_advance_stream(FORMAT_DATA->device->fd,
613                                        FORMAT_DATA->dagstream,
614                                        &bottom);
615                assert(top && bottom);
616                diff = top - bottom;
617                bottom -= diff;
618        } while (diff != 0);
619        FORMAT_DATA->top = NULL;
620        FORMAT_DATA->bottom = NULL;
621        FORMAT_DATA->processed = 0;
622        FORMAT_DATA->drops = 0;
623
624        return 0;
625}
626
627/* Pauses a DAG output trace */
628static int dag_pause_output(libtrace_out_t *libtrace) {
629
630        /* Stop and detach the stream */
631        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
632                        FORMAT_DATA_OUT->dagstream) < 0) {
633                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
634                return -1;
635        }
636        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
637                        FORMAT_DATA_OUT->dagstream) < 0) {
638                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
639                return -1;
640        }
641        FORMAT_DATA_OUT->stream_attached = 0;
642        return 0;
643}
644
645/* Pauses a DAG input trace */
646static int dag_pause_input(libtrace_t *libtrace) {
647
648        /* Stop and detach the stream */
649        if (dag_stop_stream(FORMAT_DATA->device->fd,
650                                FORMAT_DATA->dagstream) < 0) {
651                trace_set_err(libtrace, errno, "Could not stop DAG stream");
652                return -1;
653        }
654        if (dag_detach_stream(FORMAT_DATA->device->fd,
655                                FORMAT_DATA->dagstream) < 0) {
656                trace_set_err(libtrace, errno, "Could not detach DAG stream");
657                return -1;
658        }
659        FORMAT_DATA->stream_attached = 0;
660        return 0;
661}
662
663/* Closes a DAG input trace */
664static int dag_fin_input(libtrace_t *libtrace) {
665        /* Need the lock, since we're going to be handling the device list */
666        pthread_mutex_lock(&open_dag_mutex);
667       
668        /* Detach the stream if we are not paused */
669        if (FORMAT_DATA->stream_attached)
670                dag_pause_input(libtrace);
671        FORMAT_DATA->device->ref_count --;
672
673        /* Close the DAG device if there are no more references to it */
674        if (FORMAT_DATA->device->ref_count == 0)
675                dag_close_device(FORMAT_DATA->device);
676        if (DUCK.dummy_duck)
677                trace_destroy_dead(DUCK.dummy_duck);
678        free(libtrace->format_data);
679        pthread_mutex_unlock(&open_dag_mutex);
680        return 0; /* success */
681}
682
683/* Closes a DAG output trace */
684static int dag_fin_output(libtrace_out_t *libtrace) {
685       
686        /* Commit any outstanding traffic in the txbuffer */
687        if (FORMAT_DATA_OUT->waiting) {
688                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
689                                FORMAT_DATA_OUT->waiting );
690        }
691
692        /* Wait until the buffer is nearly clear before exiting the program,
693         * as we will lose packets otherwise */
694        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
695                        FORMAT_DATA_OUT->dagstream,
696                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
697                                        FORMAT_DATA_OUT->dagstream) - 8
698                        );
699
700        /* Need the lock, since we're going to be handling the device list */
701        pthread_mutex_lock(&open_dag_mutex);
702
703        /* Detach the stream if we are not paused */
704        if (FORMAT_DATA_OUT->stream_attached)
705                dag_pause_output(libtrace);
706        FORMAT_DATA_OUT->device->ref_count --;
707
708        /* Close the DAG device if there are no more references to it */
709        if (FORMAT_DATA_OUT->device->ref_count == 0)
710                dag_close_device(FORMAT_DATA_OUT->device);
711        free(libtrace->format_data);
712        pthread_mutex_unlock(&open_dag_mutex);
713        return 0; /* success */
714}
715
716/* Extracts DUCK information from the DAG card and produces a DUCK packet */
717static int dag_get_duckinfo(libtrace_t *libtrace,
718                                libtrace_packet_t *packet) {
719
720        /* Allocate memory for the DUCK data */
721        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
722                        !packet->buffer) {
723                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
724                packet->buf_control = TRACE_CTRL_PACKET;
725                if (!packet->buffer) {
726                        trace_set_err(libtrace, errno,
727                                        "Cannot allocate packet buffer");
728                        return -1;
729                }
730        }
731
732        /* DUCK doesn't have a format header */
733        packet->header = 0;
734        packet->payload = packet->buffer;
735
736        /* No need to check if we can get DUCK or not - we're modern
737         * enough so just grab the DUCK info */
738        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
739                                        (duckinf_t *)packet->payload) < 0)) {
740                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
741                return -1;
742        }
743
744        packet->type = TRACE_RT_DUCK_2_5;
745
746        /* Set the packet's tracce to point at a DUCK trace, so that the
747         * DUCK format functions will be called on the packet rather than the
748         * DAG ones */
749        if (!DUCK.dummy_duck)
750                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
751        packet->trace = DUCK.dummy_duck;
752        return sizeof(duckinf_t);
753}
754
755/* Determines the amount of data available to read from the DAG card */
756static int dag_available(libtrace_t *libtrace) {
757        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
758
759        /* If we've processed more than 4MB of data since we last called
760         * dag_advance_stream, then we should call it again to allow the
761         * space occupied by that 4MB to be released */
762        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
763                return diff;
764       
765        /* Update the top and bottom pointers */
766        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
767                        FORMAT_DATA->dagstream,
768                        &(FORMAT_DATA->bottom));
769       
770        if (FORMAT_DATA->top == NULL) {
771                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
772                return -1;
773        }
774        FORMAT_DATA->processed = 0;
775        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
776        return diff;
777}
778
779/* Returns a pointer to the start of the next complete ERF record */
780static dag_record_t *dag_get_record(libtrace_t *libtrace) {
781        dag_record_t *erfptr = NULL;
782        uint16_t size;
783        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
784        if (!erfptr)
785                return NULL;
786        size = ntohs(erfptr->rlen);
787        assert( size >= dag_record_size );
788        /* Make certain we have the full packet available */
789        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
790                return NULL;
791        FORMAT_DATA->bottom += size;
792        FORMAT_DATA->processed += size;
793        return erfptr;
794}
795
796/* Converts a buffer containing a recently read DAG packet record into a
797 * libtrace packet */
798static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
799                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
800
801        dag_record_t *erfptr;
802       
803        /* If the packet previously owned a buffer that is not the buffer
804         * that contains the new packet data, we're going to need to free the
805         * old one to avoid memory leaks */
806        if (packet->buffer != buffer &&
807                        packet->buf_control == TRACE_CTRL_PACKET) {
808                free(packet->buffer);
809        }
810
811        /* Set the buffer owner appropriately */
812        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
813                packet->buf_control = TRACE_CTRL_PACKET;
814        } else
815                packet->buf_control = TRACE_CTRL_EXTERNAL;
816
817        /* Update the packet pointers and type appropriately */
818        erfptr = (dag_record_t *)buffer;
819        packet->buffer = erfptr;
820        packet->header = erfptr;
821        packet->type = rt_type;
822
823        if (erfptr->flags.rxerror == 1) {
824                /* rxerror means the payload is corrupt - drop the payload
825                 * by tweaking rlen */
826                packet->payload = NULL;
827                erfptr->rlen = htons(erf_get_framing_length(packet));
828        } else {
829                packet->payload = (char*)packet->buffer
830                        + erf_get_framing_length(packet);
831        }
832
833        if (libtrace->format_data == NULL) {
834                dag_init_format_data(libtrace);
835        }
836
837        /* Update the dropped packets counter */
838
839        /* No loss counter for DSM coloured records - have to use
840         * some other API */
841        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
842                /* TODO */
843        } else {
844                /* Use the ERF loss counter */
845                DATA(libtrace)->drops += ntohs(erfptr->lctr);
846        }
847
848        return 0;
849}
850
851/*
852 * dag_write_packet() at this stage attempts to improve tx performance
853 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
854 * has been met. I observed approximately 270% performance increase
855 * through this relatively naive tweak. No optimisation of buffer sizes
856 * was attempted.
857 */
858
859/* Pushes an ERF record onto the transmit stream */
860static int dag_dump_packet(libtrace_out_t *libtrace,
861                dag_record_t *erfptr, unsigned int pad, void *buffer) {
862        int size;
863
864        /*
865         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
866         * requested any space yet, and request some, storing the pointer at
867         * FORMAT_DATA_OUT->txbuffer.
868         *
869         * The amount to request is slightly magical at the moment - it's
870         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
871         * the buffer and handle overruns.
872         */
873        if (FORMAT_DATA_OUT->waiting == 0) {
874                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
875                                FORMAT_DATA_OUT->dagstream, 16908288);
876        }
877
878        /*
879         * Copy the header separately to the body, as we can't guarantee they
880         * are in contiguous memory
881         */
882        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
883        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
884
885
886
887        /*
888         * Copy our incoming packet into the outgoing buffer, and increment
889         * our waiting count
890         */
891        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
892        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
893        FORMAT_DATA_OUT->waiting += size;
894
895        /*
896         * If our output buffer has more than 16 Mebibytes in it, commit those
897         * bytes and reset the waiting count to 0.
898         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
899         * case there is still data in the buffer at program exit.
900         */
901
902        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
903                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
904                        FORMAT_DATA_OUT->waiting );
905                FORMAT_DATA_OUT->waiting = 0;
906        }
907
908        return size + pad + dag_record_size;
909
910}
911
912/* Attempts to determine a suitable ERF type for a given packet. Returns true
913 * if one is found, false otherwise */
914static bool find_compatible_linktype(libtrace_out_t *libtrace,
915                                libtrace_packet_t *packet, char *type)
916{
917         // Keep trying to simplify the packet until we can find
918         //something we can do with it
919
920        do {
921                *type=libtrace_to_erf_type(trace_get_link_type(packet));
922
923                // Success
924                if (*type != (char)-1)
925                        return true;
926
927                if (!demote_packet(packet)) {
928                        trace_set_err_out(libtrace,
929                                        TRACE_ERR_NO_CONVERSION,
930                                        "No erf type for packet (%i)",
931                                        trace_get_link_type(packet));
932                        return false;
933                }
934
935        } while(1);
936
937        return true;
938}
939
940/* Writes a packet to the provided DAG output trace */
941static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
942        /*
943         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
944         * sucks, sorry about that.
945         */
946        unsigned int pad = 0;
947        int numbytes;
948        void *payload = packet->payload;
949        dag_record_t *header = (dag_record_t *)packet->header;
950        char erf_type = 0;
951
952        if(!packet->header) {
953                /* No header, probably an RT packet. Lifted from
954                 * erf_write_packet(). */
955                return -1;
956        }
957
958        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
959                return 0;
960
961        pad = dag_get_padding(packet);
962
963        /*
964         * If the payload is null, adjust the rlen. Discussion of this is
965         * attached to erf_write_packet()
966         */
967        if (payload == NULL) {
968                header->rlen = htons(dag_record_size + pad);
969        }
970
971        if (packet->type == TRACE_RT_DATA_ERF) {
972                numbytes = dag_dump_packet(libtrace,
973                                header,
974                                pad,
975                                payload
976                                );
977
978        } else {
979                /* Build up a new packet header from the existing header */
980
981                /* Simplify the packet first - if we can't do this, break
982                 * early */
983                if (!find_compatible_linktype(libtrace,packet,&erf_type))
984                        return -1;
985
986                dag_record_t erfhdr;
987
988                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
989                payload=packet->payload;
990                pad = dag_get_padding(packet);
991
992                /* Flags. Can't do this */
993                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
994                if (trace_get_direction(packet)!=~0U)
995                        erfhdr.flags.iface = trace_get_direction(packet);
996
997                erfhdr.type = erf_type;
998
999                /* Packet length (rlen includes format overhead) */
1000                assert(trace_get_capture_length(packet)>0
1001                                && trace_get_capture_length(packet)<=65536);
1002                assert(erf_get_framing_length(packet)>0
1003                                && trace_get_framing_length(packet)<=65536);
1004                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1005                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1006
1007                erfhdr.rlen = htons(trace_get_capture_length(packet)
1008                        + erf_get_framing_length(packet));
1009
1010
1011                /* Loss counter. Can't do this */
1012                erfhdr.lctr = 0;
1013                /* Wire length, does not include padding! */
1014                erfhdr.wlen = htons(trace_get_wire_length(packet));
1015
1016                /* Write it out */
1017                numbytes = dag_dump_packet(libtrace,
1018                                &erfhdr,
1019                                pad,
1020                                payload);
1021
1022        }
1023
1024        return numbytes;
1025}
1026
1027/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1028 *
1029 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1030 */
1031static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1032        int size = 0;
1033        struct timeval tv;
1034        dag_record_t *erfptr = NULL;
1035        int numbytes = 0;
1036        uint32_t flags = 0;
1037
1038        /* Check if we're due for a DUCK report */
1039        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
1040                        DUCK.duck_freq != 0) {
1041                size = dag_get_duckinfo(libtrace, packet);
1042                DUCK.last_duck = DUCK.last_pkt;
1043                if (size != 0) {
1044                        return size;
1045                }
1046                /* No DUCK support, so don't waste our time anymore */
1047                DUCK.duck_freq = 0;
1048        }
1049
1050        /* Don't let anyone try to free our DAG memory hole! */
1051        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1052
1053        /* If the packet buffer is currently owned by libtrace, free it so
1054         * that we can set the packet to point into the DAG memory hole */
1055        if (packet->buf_control == TRACE_CTRL_PACKET) {
1056                free(packet->buffer);
1057                packet->buffer = 0;
1058        }
1059
1060        /* Grab a full ERF record */
1061        do {
1062                numbytes = dag_available(libtrace);
1063                if (numbytes < 0)
1064                        return numbytes;
1065                if (numbytes < dag_record_size)
1066                        /* Block until we see a packet */
1067                        continue;
1068                erfptr = dag_get_record(libtrace);
1069        } while (erfptr == NULL);
1070
1071        /* Prepare the libtrace packet */
1072        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1073                                flags))
1074                return -1;
1075
1076        /* Update the DUCK timer */
1077        tv = trace_get_timeval(packet);
1078        DUCK.last_pkt = tv.tv_sec;
1079
1080        return packet->payload ? htons(erfptr->rlen) :
1081                                erf_get_framing_length(packet);
1082}
1083
1084/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1085 * packet is available, we will return a packet event. Otherwise we will
1086 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1087 */
1088static libtrace_eventobj_t trace_event_dag(libtrace_t *trace,
1089                                        libtrace_packet_t *packet) {
1090        libtrace_eventobj_t event = {0,0,0.0,0};
1091        dag_record_t *erfptr = NULL;
1092        int numbytes;
1093        uint32_t flags = 0;
1094
1095        do {
1096                erfptr = NULL;
1097                numbytes = 0;
1098       
1099                /* Need to call dag_available so that the top pointer will get
1100                 * updated, otherwise we'll never see any data! */
1101                numbytes = dag_available(trace);
1102
1103                /* May as well not bother calling dag_get_record if
1104                 * dag_available suggests that there's no data */
1105                if (numbytes != 0)
1106                        erfptr = dag_get_record(trace);
1107                if (erfptr == NULL) {
1108                        /* No packet available - sleep for a very short time */
1109                        event.type = TRACE_EVENT_SLEEP;
1110                        event.seconds = 0.0001;
1111                        break;
1112                }
1113                if (dag_prepare_packet(trace, packet, erfptr, 
1114                                        TRACE_RT_DATA_ERF, flags)) {
1115                        event.type = TRACE_EVENT_TERMINATE;
1116                        break;
1117                }
1118
1119
1120                event.size = trace_get_capture_length(packet) + 
1121                                trace_get_framing_length(packet);
1122               
1123                /* XXX trace_read_packet() normally applies the following
1124                 * config options for us, but this function is called via
1125                 * trace_event() so we have to do it ourselves */
1126
1127                if (trace->filter) {
1128                        if (trace_apply_filter(trace->filter, packet)) {
1129                                event.type = TRACE_EVENT_PACKET;
1130                        } else {
1131                                /* This packet isn't useful so we want to
1132                                 * immediately see if there is another suitable
1133                                 * one - we definitely DO NOT want to return
1134                                 * a sleep event in this case, like we used to
1135                                 * do! */
1136                                trace_clear_cache(packet);
1137                                continue;
1138                        }
1139                } else {
1140                        event.type = TRACE_EVENT_PACKET;
1141                }
1142
1143                if (trace->snaplen > 0) {
1144                        trace_set_capture_length(packet, trace->snaplen);
1145                }
1146                break;
1147        } while (1);
1148
1149        return event;
1150}
1151
1152/* Gets the number of dropped packets */
1153static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1154        if (trace->format_data == NULL)
1155                return (uint64_t)-1;
1156        return DATA(trace)->drops;
1157}
1158
1159/* Prints some semi-useful help text about the DAG format module */
1160static void dag_help(void) {
1161        printf("dag format module: $Revision: 1755 $\n");
1162        printf("Supported input URIs:\n");
1163        printf("\tdag:/dev/dagn\n");
1164        printf("\n");
1165        printf("\te.g.: dag:/dev/dag0\n");
1166        printf("\n");
1167        printf("Supported output URIs:\n");
1168        printf("\tnone\n");
1169        printf("\n");
1170}
1171
1172static struct libtrace_format_t dag = {
1173        "dag",
1174        "$Id$",
1175        TRACE_FORMAT_ERF,
1176        dag_probe_filename,             /* probe filename */
1177        NULL,                           /* probe magic */
1178        dag_init_input,                 /* init_input */
1179        dag_config_input,               /* config_input */
1180        dag_start_input,                /* start_input */
1181        dag_pause_input,                /* pause_input */
1182        dag_init_output,                /* init_output */
1183        NULL,                           /* config_output */
1184        dag_start_output,               /* start_output */
1185        dag_fin_input,                  /* fin_input */
1186        dag_fin_output,                 /* fin_output */
1187        dag_read_packet,                /* read_packet */
1188        dag_prepare_packet,             /* prepare_packet */
1189        NULL,                           /* fin_packet */
1190        dag_write_packet,               /* write_packet */
1191        erf_get_link_type,              /* get_link_type */
1192        erf_get_direction,              /* get_direction */
1193        erf_set_direction,              /* set_direction */
1194        erf_get_erf_timestamp,          /* get_erf_timestamp */
1195        NULL,                           /* get_timeval */
1196        NULL,                           /* get_seconds */
1197        NULL,                           /* get_timespec */
1198        NULL,                           /* seek_erf */
1199        NULL,                           /* seek_timeval */
1200        NULL,                           /* seek_seconds */
1201        erf_get_capture_length,         /* get_capture_length */
1202        erf_get_wire_length,            /* get_wire_length */
1203        erf_get_framing_length,         /* get_framing_length */
1204        erf_set_capture_length,         /* set_capture_length */
1205        NULL,                           /* get_received_packets */
1206        NULL,                           /* get_filtered_packets */
1207        dag_get_dropped_packets,        /* get_dropped_packets */
1208        NULL,                           /* get_captured_packets */
1209        NULL,                           /* get_fd */
1210        trace_event_dag,                /* trace_event */
1211        dag_help,                       /* help */
1212        NULL, /* pstart_input */
1213        NULL, /* pread_packet */
1214        NULL, /* ppause_input */
1215        NULL, /* pfin_input */
1216        NULL, /* pconfig_input */
1217        NULL,                            /* next pointer */
1218        NON_PARALLEL(true)
1219};
1220
1221void dag_constructor(void) {
1222        register_format(&dag);
1223}
Note: See TracBrowser for help on using the repository browser.