source: lib/format_dag25.c @ d9b0550

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since d9b0550 was d9b0550, checked in by Perry Lorier <perry@…>, 11 years ago

Document things we should be checking

  • Property mode set to 100644
File size: 36.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#define _GNU_SOURCE
35
36#include "config.h"
37#include "common.h"
38#include "libtrace.h"
39#include "libtrace_int.h"
40#include "format_helper.h"
41#include "format_erf.h"
42
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49
50#include <sys/mman.h>
51/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
52 * Windows anyway so we'll worry about this more later :] */
53#include <pthread.h>
54
55
56#ifdef WIN32
57#  include <io.h>
58#  include <share.h>
59#  define PATH_MAX _MAX_PATH
60#  define snprintf sprintf_s
61#else
62#  include <netdb.h>
63#  ifndef PATH_MAX
64#       define PATH_MAX 4096
65#  endif
66#  include <sys/ioctl.h>
67#endif
68
69/* This format deals with DAG cards that are using drivers from the 2.5 version
70 * onwards, including 3.X.
71 *
72 * DAG is a LIVE capture format.
73 *
74 * This format does support writing, provided the DAG card that you are using
75 * has transmit (Tx) support. Additionally, packets read using this format
76 * are in the ERF format, so can easily be written as ERF traces without
77 * losing any data.
78 */
79
80
81#define DATA(x) ((struct dag_format_data_t *)x->format_data)
82#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
83
84#define FORMAT_DATA DATA(libtrace)
85#define FORMAT_DATA_OUT DATA_OUT(libtrace)
86
87#define DUCK FORMAT_DATA->duck
88static struct libtrace_format_t dag;
89
90/* A DAG device - a DAG device can support multiple streams (and therefore
91 * multiple input traces) so each trace needs to refer to a device */
92struct dag_dev_t {
93        char * dev_name;                /* Device name */
94        int fd;                         /* File descriptor */
95        uint16_t ref_count;             /* Number of input / output traces
96                                           that are using this device */
97        struct dag_dev_t *prev;         /* Pointer to the previous device in
98                                           the device list */
99        struct dag_dev_t *next;         /* Pointer to the next device in the
100                                           device list */
101};
102
103/* "Global" data that is stored for each DAG output trace */
104struct dag_format_data_out_t {
105        /* The DAG device being used for writing */
106        struct dag_dev_t *device;
107        /* The DAG stream that is being written on */
108        unsigned int dagstream;
109        /* Boolean flag indicating whether the stream is currently attached */
110        int stream_attached;
111        /* The amount of data waiting to be transmitted, in bytes */
112        uint64_t waiting;
113        /* A buffer to hold the data to be transmittted */
114        uint8_t *txbuffer;
115};
116
117/* "Global" data that is stored for each DAG input trace */
118struct dag_format_data_t {
119
120        /* Data required for regular DUCK reporting */
121        struct {
122                /* Timestamp of the last DUCK report */
123                uint32_t last_duck;
124                /* The number of seconds between each DUCK report */
125                uint32_t duck_freq;
126                /* Timestamp of the last packet read from the DAG card */
127                uint32_t last_pkt;
128                /* Dummy trace to ensure DUCK packets are dealt with using the
129                 * DUCK format functions */
130                libtrace_t *dummy_duck;
131        } duck;
132
133        /* The DAG device that we are reading from */
134        struct dag_dev_t *device;
135        /* The DAG stream that we are reading from */
136        unsigned int dagstream;
137        /* Boolean flag indicating whether the stream is currently attached */
138        int stream_attached;
139        /* Pointer to the first unread byte in the DAG memory hole */
140        uint8_t *bottom;
141        /* Pointer to the last unread byte in the DAG memory hole */
142        uint8_t *top;
143        /* The amount of data processed thus far from the bottom pointer */
144        uint32_t processed;
145        /* The number of packets that have been dropped */
146        uint64_t drops;
147};
148
149/* To be thread-safe, we're going to need a mutex for operating on the list
150 * of DAG devices */
151pthread_mutex_t open_dag_mutex;
152
153/* The list of DAG devices that have been opened by libtrace.
154 *
155 * We can only open each DAG device once, but we might want to read from
156 * multiple streams. Therefore, we need to maintain a list of devices that we
157 * have opened (with ref counts!) so that we don't try to open a device too
158 * many times or close a device that we're still using */
159struct dag_dev_t *open_dags = NULL;
160
161/* Returns the amount of padding between the ERF header and the start of the
162 * captured packet data */
163static int dag_get_padding(const libtrace_packet_t *packet)
164{
165        /* ERF Ethernet records have a 2 byte padding before the packet itself
166         * so that the IP header is aligned on a 32 bit boundary.
167         */
168        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
169                dag_record_t *erfptr = (dag_record_t *)packet->header;
170                switch(erfptr->type) {
171                        case TYPE_ETH:
172                        case TYPE_DSM_COLOR_ETH:
173                                return 2;
174                        default:                return 0;
175                }
176        }
177        else {
178                switch(trace_get_link_type(packet)) {
179                        case TRACE_TYPE_ETH:    return 2;
180                        default:                return 0;
181                }
182        }
183}
184
185/* Attempts to determine if the given filename refers to a DAG device */
186static int dag_probe_filename(const char *filename)
187{
188        struct stat statbuf;
189        /* Can we stat the file? */
190        if (stat(filename, &statbuf) != 0) {
191                return 0;
192        }
193        /* Is it a character device? */
194        if (!S_ISCHR(statbuf.st_mode)) {
195                return 0;
196        }
197        /* Yeah, it's probably us. */
198        return 1;
199}
200
201/* Initialises the DAG output data structure */
202static void dag_init_format_out_data(libtrace_out_t *libtrace) {
203        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
204        // no DUCK on output
205        FORMAT_DATA_OUT->stream_attached = 0;
206        FORMAT_DATA_OUT->device = NULL;
207        FORMAT_DATA_OUT->dagstream = 0;
208        FORMAT_DATA_OUT->waiting = 0;
209
210}
211
212/* Initialises the DAG input data structure */
213static void dag_init_format_data(libtrace_t *libtrace) {
214        libtrace->format_data = (struct dag_format_data_t *)
215                malloc(sizeof(struct dag_format_data_t));
216        DUCK.last_duck = 0;
217        DUCK.duck_freq = 0;
218        DUCK.last_pkt = 0;
219        DUCK.dummy_duck = NULL;
220        FORMAT_DATA->stream_attached = 0;
221        FORMAT_DATA->drops = 0;
222        FORMAT_DATA->device = NULL;
223        FORMAT_DATA->dagstream = 0;
224        FORMAT_DATA->processed = 0;
225        FORMAT_DATA->bottom = NULL;
226        FORMAT_DATA->top = NULL;
227}
228
229/* Determines if there is already an entry for the given DAG device in the
230 * device list and increments the reference count for that device, if found.
231 *
232 * NOTE: This function assumes the open_dag_mutex is held by the caller */
233static struct dag_dev_t *dag_find_open_device(char *dev_name) {
234        struct dag_dev_t *dag_dev;
235
236        dag_dev = open_dags;
237
238        /* XXX: Not exactly zippy, but how often are we going to be dealing
239         * with multiple dag cards? */
240        while (dag_dev != NULL) {
241                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
242                        dag_dev->ref_count ++;
243                        return dag_dev;
244
245                }
246                dag_dev = dag_dev->next;
247        }
248        return NULL;
249
250
251}
252
253/* Closes a DAG device and removes it from the device list.
254 *
255 * Attempting to close a DAG device that has a non-zero reference count will
256 * cause an assertion failure!
257 *
258 * NOTE: This function assumes the open_dag_mutex is held by the caller */
259static void dag_close_device(struct dag_dev_t *dev) {
260        /* Need to remove from the device list */
261
262        assert(dev->ref_count == 0);
263
264        if (dev->prev == NULL) {
265                open_dags = dev->next;
266                if (dev->next)
267                        dev->next->prev = NULL;
268        } else {
269                dev->prev->next = dev->next;
270                if (dev->next)
271                        dev->next->prev = dev->prev;
272        }
273
274        dag_close(dev->fd);
275        if (dev->dev_name)
276        free(dev->dev_name);
277        free(dev);
278}
279
280
281/* Opens a new DAG device for writing and adds it to the DAG device list
282 *
283 * NOTE: this function should only be called when opening a DAG device for
284 * writing - there is little practical difference between this and the
285 * function below that covers the reading case, but we need the output trace
286 * object to report errors properly so the two functions take slightly
287 * different arguments. This is really lame and there should be a much better
288 * way of doing this.
289 *
290 * NOTE: This function assumes the open_dag_mutex is held by the caller
291 */
292static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
293        struct stat buf;
294        int fd;
295        struct dag_dev_t *new_dev;
296
297        /* Make sure the device exists */
298        if (stat(dev_name, &buf) == -1) {
299                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
300                return NULL;
301}
302
303        /* Make sure it is the appropriate type of device */
304        if (S_ISCHR(buf.st_mode)) {
305                /* Try opening the DAG device */
306                if((fd = dag_open(dev_name)) < 0) {
307                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
308                                        dev_name);
309                        return NULL;
310                }
311        } else {
312                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
313                                dev_name);
314                return NULL;
315        }
316
317        /* Add the device to our device list - it is just a doubly linked
318         * list with no inherent ordering; just tack the new one on the front
319         */
320        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
321        new_dev->fd = fd;
322        new_dev->dev_name = dev_name;
323        new_dev->ref_count = 1;
324
325        new_dev->prev = NULL;
326        new_dev->next = open_dags;
327        if (open_dags)
328                open_dags->prev = new_dev;
329
330        open_dags = new_dev;
331
332        return new_dev;
333}
334
335/* Opens a new DAG device for reading and adds it to the DAG device list
336 *
337 * NOTE: this function should only be called when opening a DAG device for
338 * reading - there is little practical difference between this and the
339 * function above that covers the writing case, but we need the input trace
340 * object to report errors properly so the two functions take slightly
341 * different arguments. This is really lame and there should be a much better
342 * way of doing this.
343 *
344 * NOTE: This function assumes the open_dag_mutex is held by the caller */
345static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
346        struct stat buf;
347        int fd;
348        struct dag_dev_t *new_dev;
349
350        /* Make sure the device exists */
351        if (stat(dev_name, &buf) == -1) {
352                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
353                return NULL;
354        }
355
356        /* Make sure it is the appropriate type of device */
357        if (S_ISCHR(buf.st_mode)) {
358                /* Try opening the DAG device */
359                if((fd = dag_open(dev_name)) < 0) {
360                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
361                                        dev_name);
362                        return NULL;
363                }
364        } else {
365                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
366                                dev_name);
367                return NULL;
368        }
369
370        /* Add the device to our device list - it is just a doubly linked
371         * list with no inherent ordering; just tack the new one on the front
372         */
373        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
374        new_dev->fd = fd;
375        new_dev->dev_name = dev_name;
376        new_dev->ref_count = 1;
377
378        new_dev->prev = NULL;
379        new_dev->next = open_dags;
380        if (open_dags)
381                open_dags->prev = new_dev;
382
383        open_dags = new_dev;
384
385        return new_dev;
386}
387
388/* Creates and initialises a DAG output trace */
389static int dag_init_output(libtrace_out_t *libtrace) {
390        char *dag_dev_name = NULL;
391        char *scan = NULL;
392        struct dag_dev_t *dag_device = NULL;
393        int stream = 1;
394       
395        /* XXX I don't know if this is important or not, but this function
396         * isn't present in all of the driver releases that this code is
397         * supposed to support! */
398        /*
399        unsigned long wake_time;
400        dagutil_sleep_get_wake_time(&wake_time,0);
401        */
402
403        dag_init_format_out_data(libtrace);
404        /* Grab the mutex while we're likely to be messing with the device
405         * list */
406        pthread_mutex_lock(&open_dag_mutex);
407       
408        /* Specific streams are signified using a comma in the libtrace URI,
409         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
410         *
411         * If no stream is specified, we will write using stream 1 */
412        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
413                dag_dev_name = strdup(libtrace->uridata);
414        } else {
415                dag_dev_name = (char *)strndup(libtrace->uridata,
416                                (size_t)(scan - libtrace->uridata));
417                stream = atoi(++scan);
418        }
419        FORMAT_DATA_OUT->dagstream = stream;
420
421        /* See if our DAG device is already open */
422        dag_device = dag_find_open_device(dag_dev_name);
423
424        if (dag_device == NULL) {
425                /* Device not yet opened - open it ourselves */
426                dag_device = dag_open_output_device(libtrace, dag_dev_name);
427        } else {
428                /* Otherwise, just use the existing one */
429                free(dag_dev_name);
430                dag_dev_name = NULL;
431        }
432
433        /* Make sure we have successfully opened a DAG device */
434        if (dag_device == NULL) {
435                if (dag_dev_name) {
436                        free(dag_dev_name);
437                }
438                pthread_mutex_unlock(&open_dag_mutex);
439                return -1;
440        }
441
442        FORMAT_DATA_OUT->device = dag_device;
443        pthread_mutex_unlock(&open_dag_mutex);
444        return 0;
445}
446
447/* Creates and initialises a DAG input trace */
448static int dag_init_input(libtrace_t *libtrace) {
449        char *dag_dev_name = NULL;
450        char *scan = NULL;
451        int stream = 0;
452        struct dag_dev_t *dag_device = NULL;
453
454        dag_init_format_data(libtrace);
455        /* Grab the mutex while we're likely to be messing with the device
456         * list */
457        pthread_mutex_lock(&open_dag_mutex);
458       
459       
460        /* Specific streams are signified using a comma in the libtrace URI,
461         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
462         *
463         * If no stream is specified, we will read from stream 0 */
464        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
465                dag_dev_name = strdup(libtrace->uridata);
466        } else {
467                dag_dev_name = (char *)strndup(libtrace->uridata,
468                                (size_t)(scan - libtrace->uridata));
469                stream = atoi(++scan);
470        }
471
472        FORMAT_DATA->dagstream = stream;
473
474        /* See if our DAG device is already open */
475        dag_device = dag_find_open_device(dag_dev_name);
476
477        if (dag_device == NULL) {
478                /* Device not yet opened - open it ourselves */
479                dag_device = dag_open_device(libtrace, dag_dev_name);
480        } else {
481                /* Otherwise, just use the existing one */
482                free(dag_dev_name);
483                dag_dev_name = NULL;
484        }
485
486        /* Make sure we have successfully opened a DAG device */
487        if (dag_device == NULL) {
488                if (dag_dev_name)
489                        free(dag_dev_name);
490                dag_dev_name = NULL;
491                pthread_mutex_unlock(&open_dag_mutex);
492                return -1;
493        }
494
495        FORMAT_DATA->device = dag_device;
496
497        /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
498        /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
499 
500        *  The symptom of the port being disabled is that libtrace will appear to hang.
501        */
502        /* Check kBooleanAttributeFault is false */
503        /* Check kBooleanAttributeLocalFault is false */
504        /* Check kBooleanAttributeLock is true ? */
505        /* Check kBooleanAttributePeerLink ? */
506
507        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
508        /* Set kUint32AttributeSnapLength to the snaplength */
509
510        pthread_mutex_unlock(&open_dag_mutex);
511        return 0;
512}
513
514/* Configures a DAG input trace */
515static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
516                                void *data) {
517        char conf_str[4096];
518        switch(option) {
519                case TRACE_OPTION_META_FREQ:
520                        /* This option is used to specify the frequency of DUCK
521                         * updates */
522                        DUCK.duck_freq = *(int *)data;
523                        return 0;
524                case TRACE_OPTION_SNAPLEN:
525                        /* Tell the card our new snap length */
526                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
527                        if (dag_configure(FORMAT_DATA->device->fd,
528                                                conf_str) != 0) {
529                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
530                                return -1;
531                        }
532                        return 0;
533                case TRACE_OPTION_PROMISC:
534                        /* DAG already operates in a promisc fashion */
535                        return -1;
536                case TRACE_OPTION_FILTER:
537                        /* We don't yet support pushing filters into DAG
538                         * cards */
539                        return -1;
540                case TRACE_OPTION_EVENT_REALTIME:
541                        /* Live capture is always going to be realtime */
542                        return -1;
543        }
544        return -1;
545}
546
547/* Starts a DAG output trace */
548static int dag_start_output(libtrace_out_t *libtrace) {
549        struct timeval zero, nopoll;
550        uint8_t *top, *bottom;
551        top = bottom = NULL;
552
553        zero.tv_sec = 0;
554        zero.tv_usec = 0;
555        nopoll = zero;
556
557        /* Attach and start the DAG stream */
558
559        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
560                        FORMAT_DATA_OUT->dagstream, 0, 1048576) < 0) {
561                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
562                return -1;
563        }
564
565        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
566                        FORMAT_DATA_OUT->dagstream) < 0) {
567                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
568                return -1;
569        }
570        FORMAT_DATA_OUT->stream_attached = 1;
571
572        /* We don't want the dag card to do any sleeping */
573
574        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
575                        FORMAT_DATA_OUT->dagstream, 0, &zero,
576                        &nopoll);
577
578        return 0;
579}
580
581/* Starts a DAG input trace */
582static int dag_start_input(libtrace_t *libtrace) {
583        struct timeval zero, nopoll;
584        uint8_t *top, *bottom;
585        uint8_t diff = 0;
586        top = bottom = NULL;
587
588        zero.tv_sec = 0;
589        zero.tv_usec = 0;
590        nopoll = zero;
591
592        /* Attach and start the DAG stream */
593        if (dag_attach_stream(FORMAT_DATA->device->fd,
594                                FORMAT_DATA->dagstream, 0, 0) < 0) {
595                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
596                return -1;
597        }
598
599        if (dag_start_stream(FORMAT_DATA->device->fd,
600                                FORMAT_DATA->dagstream) < 0) {
601                trace_set_err(libtrace, errno, "Cannot start DAG stream");
602                return -1;
603        }
604        FORMAT_DATA->stream_attached = 1;
605       
606        /* We don't want the dag card to do any sleeping */
607        dag_set_stream_poll(FORMAT_DATA->device->fd,
608                                FORMAT_DATA->dagstream, 0, &zero,
609                                &nopoll);
610
611        /* Should probably flush the memory hole now */
612        do {
613                top = dag_advance_stream(FORMAT_DATA->device->fd,
614                                        FORMAT_DATA->dagstream,
615                                        &bottom);
616                assert(top && bottom);
617                diff = top - bottom;
618                bottom -= diff;
619        } while (diff != 0);
620        FORMAT_DATA->top = NULL;
621        FORMAT_DATA->bottom = NULL;
622        FORMAT_DATA->processed = 0;
623        FORMAT_DATA->drops = 0;
624
625        return 0;
626}
627
628/* Pauses a DAG output trace */
629static int dag_pause_output(libtrace_out_t *libtrace) {
630
631        /* Stop and detach the stream */
632        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
633                        FORMAT_DATA_OUT->dagstream) < 0) {
634                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
635                return -1;
636        }
637        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
638                        FORMAT_DATA_OUT->dagstream) < 0) {
639                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
640                return -1;
641        }
642        FORMAT_DATA_OUT->stream_attached = 0;
643        return 0;
644}
645
646/* Pauses a DAG input trace */
647static int dag_pause_input(libtrace_t *libtrace) {
648
649        /* Stop and detach the stream */
650        if (dag_stop_stream(FORMAT_DATA->device->fd,
651                                FORMAT_DATA->dagstream) < 0) {
652                trace_set_err(libtrace, errno, "Could not stop DAG stream");
653                return -1;
654        }
655        if (dag_detach_stream(FORMAT_DATA->device->fd,
656                                FORMAT_DATA->dagstream) < 0) {
657                trace_set_err(libtrace, errno, "Could not detach DAG stream");
658                return -1;
659        }
660        FORMAT_DATA->stream_attached = 0;
661        return 0;
662}
663
664/* Closes a DAG input trace */
665static int dag_fin_input(libtrace_t *libtrace) {
666        /* Need the lock, since we're going to be handling the device list */
667        pthread_mutex_lock(&open_dag_mutex);
668       
669        /* Detach the stream if we are not paused */
670        if (FORMAT_DATA->stream_attached)
671                dag_pause_input(libtrace);
672        FORMAT_DATA->device->ref_count --;
673
674        /* Close the DAG device if there are no more references to it */
675        if (FORMAT_DATA->device->ref_count == 0)
676                dag_close_device(FORMAT_DATA->device);
677        if (DUCK.dummy_duck)
678                trace_destroy_dead(DUCK.dummy_duck);
679        free(libtrace->format_data);
680        pthread_mutex_unlock(&open_dag_mutex);
681        return 0; /* success */
682}
683
684/* Closes a DAG output trace */
685static int dag_fin_output(libtrace_out_t *libtrace) {
686       
687        /* Commit any outstanding traffic in the txbuffer */
688        if (FORMAT_DATA_OUT->waiting) {
689                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
690                                FORMAT_DATA_OUT->waiting );
691        }
692
693        /* Wait until the buffer is nearly clear before exiting the program,
694         * as we will lose packets otherwise */
695        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
696                        FORMAT_DATA_OUT->dagstream,
697                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
698                                        FORMAT_DATA_OUT->dagstream) - 8
699                        );
700
701        /* Need the lock, since we're going to be handling the device list */
702        pthread_mutex_lock(&open_dag_mutex);
703
704        /* Detach the stream if we are not paused */
705        if (FORMAT_DATA_OUT->stream_attached)
706                dag_pause_output(libtrace);
707        FORMAT_DATA_OUT->device->ref_count --;
708
709        /* Close the DAG device if there are no more references to it */
710        if (FORMAT_DATA_OUT->device->ref_count == 0)
711                dag_close_device(FORMAT_DATA_OUT->device);
712        free(libtrace->format_data);
713        pthread_mutex_unlock(&open_dag_mutex);
714        return 0; /* success */
715}
716
717/* Extracts DUCK information from the DAG card and produces a DUCK packet */
718static int dag_get_duckinfo(libtrace_t *libtrace,
719                                libtrace_packet_t *packet) {
720
721        /* Allocate memory for the DUCK data */
722        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
723                        !packet->buffer) {
724                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
725                packet->buf_control = TRACE_CTRL_PACKET;
726                if (!packet->buffer) {
727                        trace_set_err(libtrace, errno,
728                                        "Cannot allocate packet buffer");
729                        return -1;
730                }
731        }
732
733        /* DUCK doesn't have a format header */
734        packet->header = 0;
735        packet->payload = packet->buffer;
736
737        /* No need to check if we can get DUCK or not - we're modern
738         * enough so just grab the DUCK info */
739        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
740                                        (duckinf_t *)packet->payload) < 0)) {
741                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
742                return -1;
743        }
744
745        packet->type = TRACE_RT_DUCK_2_5;
746
747        /* Set the packet's tracce to point at a DUCK trace, so that the
748         * DUCK format functions will be called on the packet rather than the
749         * DAG ones */
750        if (!DUCK.dummy_duck)
751                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
752        packet->trace = DUCK.dummy_duck;
753        return sizeof(duckinf_t);
754}
755
756/* Determines the amount of data available to read from the DAG card */
757static int dag_available(libtrace_t *libtrace) {
758        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
759
760        /* If we've processed more than 4MB of data since we last called
761         * dag_advance_stream, then we should call it again to allow the
762         * space occupied by that 4MB to be released */
763        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
764                return diff;
765       
766        /* Update the top and bottom pointers */
767        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
768                        FORMAT_DATA->dagstream,
769                        &(FORMAT_DATA->bottom));
770       
771        if (FORMAT_DATA->top == NULL) {
772                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
773                return -1;
774        }
775        FORMAT_DATA->processed = 0;
776        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
777        return diff;
778}
779
780/* Returns a pointer to the start of the next complete ERF record */
781static dag_record_t *dag_get_record(libtrace_t *libtrace) {
782        dag_record_t *erfptr = NULL;
783        uint16_t size;
784        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
785        if (!erfptr)
786                return NULL;
787        size = ntohs(erfptr->rlen);
788        assert( size >= dag_record_size );
789        /* Make certain we have the full packet available */
790        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
791                return NULL;
792        FORMAT_DATA->bottom += size;
793        FORMAT_DATA->processed += size;
794        return erfptr;
795}
796
797/* Converts a buffer containing a recently read DAG packet record into a
798 * libtrace packet */
799static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
800                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
801
802        dag_record_t *erfptr;
803       
804        /* If the packet previously owned a buffer that is not the buffer
805         * that contains the new packet data, we're going to need to free the
806         * old one to avoid memory leaks */
807        if (packet->buffer != buffer &&
808                        packet->buf_control == TRACE_CTRL_PACKET) {
809                free(packet->buffer);
810        }
811
812        /* Set the buffer owner appropriately */
813        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
814                packet->buf_control = TRACE_CTRL_PACKET;
815        } else
816                packet->buf_control = TRACE_CTRL_EXTERNAL;
817
818        /* Update the packet pointers and type appropriately */
819        erfptr = (dag_record_t *)buffer;
820        packet->buffer = erfptr;
821        packet->header = erfptr;
822        packet->type = rt_type;
823
824        if (erfptr->flags.rxerror == 1) {
825                /* rxerror means the payload is corrupt - drop the payload
826                 * by tweaking rlen */
827                packet->payload = NULL;
828                erfptr->rlen = htons(erf_get_framing_length(packet));
829        } else {
830                packet->payload = (char*)packet->buffer
831                        + erf_get_framing_length(packet);
832        }
833
834        if (libtrace->format_data == NULL) {
835                dag_init_format_data(libtrace);
836        }
837
838        /* Update the dropped packets counter */
839
840        /* No loss counter for DSM coloured records - have to use
841         * some other API */
842        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
843                /* TODO */
844        } else {
845                /* Use the ERF loss counter */
846                DATA(libtrace)->drops += ntohs(erfptr->lctr);
847        }
848
849        return 0;
850}
851
852/*
853 * dag_write_packet() at this stage attempts to improve tx performance
854 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
855 * has been met. I observed approximately 270% performance increase
856 * through this relatively naive tweak. No optimisation of buffer sizes
857 * was attempted.
858 */
859
860/* Pushes an ERF record onto the transmit stream */
861static int dag_dump_packet(libtrace_out_t *libtrace,
862                dag_record_t *erfptr, unsigned int pad, void *buffer) {
863        int size;
864
865        /*
866         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
867         * requested any space yet, and request some, storing the pointer at
868         * FORMAT_DATA_OUT->txbuffer.
869         *
870         * The amount to request is slightly magical at the moment - it's
871         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
872         * the buffer and handle overruns.
873         */
874        if (FORMAT_DATA_OUT->waiting == 0) {
875                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
876                                FORMAT_DATA_OUT->dagstream, 16908288);
877        }
878
879        /*
880         * Copy the header separately to the body, as we can't guarantee they
881         * are in contiguous memory
882         */
883        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
884        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
885
886
887
888        /*
889         * Copy our incoming packet into the outgoing buffer, and increment
890         * our waiting count
891         */
892        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
893        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
894        FORMAT_DATA_OUT->waiting += size;
895
896        /*
897         * If our output buffer has more than 16 Mebibytes in it, commit those
898         * bytes and reset the waiting count to 0.
899         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in
900         * case there is still data in the buffer at program exit.
901         */
902
903        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
904                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
905                        FORMAT_DATA_OUT->waiting );
906                FORMAT_DATA_OUT->waiting = 0;
907        }
908
909        return size + pad + dag_record_size;
910
911}
912
913/* Attempts to determine a suitable ERF type for a given packet. Returns true
914 * if one is found, false otherwise */
915static bool find_compatible_linktype(libtrace_out_t *libtrace,
916                                libtrace_packet_t *packet, char *type)
917{
918         // Keep trying to simplify the packet until we can find
919         //something we can do with it
920
921        do {
922                *type=libtrace_to_erf_type(trace_get_link_type(packet));
923
924                // Success
925                if (*type != (char)-1)
926                        return true;
927
928                if (!demote_packet(packet)) {
929                        trace_set_err_out(libtrace,
930                                        TRACE_ERR_NO_CONVERSION,
931                                        "No erf type for packet (%i)",
932                                        trace_get_link_type(packet));
933                        return false;
934                }
935
936        } while(1);
937
938        return true;
939}
940
941/* Writes a packet to the provided DAG output trace */
942static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
943        /*
944         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
945         * sucks, sorry about that.
946         */
947        unsigned int pad = 0;
948        int numbytes;
949        void *payload = packet->payload;
950        dag_record_t *header = (dag_record_t *)packet->header;
951        char erf_type = 0;
952
953        if(!packet->header) {
954                /* No header, probably an RT packet. Lifted from
955                 * erf_write_packet(). */
956                return -1;
957        }
958
959        pad = dag_get_padding(packet);
960
961        /*
962         * If the payload is null, adjust the rlen. Discussion of this is
963         * attached to erf_write_packet()
964         */
965        if (payload == NULL) {
966                header->rlen = htons(dag_record_size + pad);
967        }
968
969        if (packet->type == TRACE_RT_DATA_ERF) {
970                numbytes = dag_dump_packet(libtrace,
971                                header,
972                                pad,
973                                payload
974                                );
975
976        } else {
977                /* Build up a new packet header from the existing header */
978
979                /* Simplify the packet first - if we can't do this, break
980                 * early */
981                if (!find_compatible_linktype(libtrace,packet,&erf_type))
982                        return -1;
983
984                dag_record_t erfhdr;
985
986                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
987                payload=packet->payload;
988                pad = dag_get_padding(packet);
989
990                /* Flags. Can't do this */
991                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
992                if (trace_get_direction(packet)!=~0U)
993                        erfhdr.flags.iface = trace_get_direction(packet);
994
995                erfhdr.type = erf_type;
996
997                /* Packet length (rlen includes format overhead) */
998                assert(trace_get_capture_length(packet)>0
999                                && trace_get_capture_length(packet)<=65536);
1000                assert(erf_get_framing_length(packet)>0
1001                                && trace_get_framing_length(packet)<=65536);
1002                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
1003                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
1004
1005                erfhdr.rlen = htons(trace_get_capture_length(packet)
1006                        + erf_get_framing_length(packet));
1007
1008
1009                /* Loss counter. Can't do this */
1010                erfhdr.lctr = 0;
1011                /* Wire length, does not include padding! */
1012                erfhdr.wlen = htons(trace_get_wire_length(packet));
1013
1014                /* Write it out */
1015                numbytes = dag_dump_packet(libtrace,
1016                                &erfhdr,
1017                                pad,
1018                                payload);
1019
1020        }
1021
1022        return numbytes;
1023}
1024
1025/* Reads the next available packet from a DAG card, in a BLOCKING fashion
1026 *
1027 * If DUCK reporting is enabled, the packet returned may be a DUCK update
1028 */
1029static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1030        int size = 0;
1031        struct timeval tv;
1032        dag_record_t *erfptr = NULL;
1033        int numbytes = 0;
1034        uint32_t flags = 0;
1035
1036        /* Check if we're due for a DUCK report */
1037        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
1038                        DUCK.duck_freq != 0) {
1039                size = dag_get_duckinfo(libtrace, packet);
1040                DUCK.last_duck = DUCK.last_pkt;
1041                if (size != 0) {
1042                        return size;
1043                }
1044                /* No DUCK support, so don't waste our time anymore */
1045                DUCK.duck_freq = 0;
1046        }
1047
1048        /* Don't let anyone try to free our DAG memory hole! */
1049        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
1050
1051        /* If the packet buffer is currently owned by libtrace, free it so
1052         * that we can set the packet to point into the DAG memory hole */
1053        if (packet->buf_control == TRACE_CTRL_PACKET) {
1054                free(packet->buffer);
1055                packet->buffer = 0;
1056        }
1057
1058        /* Grab a full ERF record */
1059        do {
1060                numbytes = dag_available(libtrace);
1061                if (numbytes < 0)
1062                        return numbytes;
1063                if (numbytes < dag_record_size)
1064                        /* Block until we see a packet */
1065                        continue;
1066                erfptr = dag_get_record(libtrace);
1067        } while (erfptr == NULL);
1068
1069        /* Prepare the libtrace packet */
1070        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
1071                                flags))
1072                return -1;
1073
1074        /* Update the DUCK timer */
1075        tv = trace_get_timeval(packet);
1076        DUCK.last_pkt = tv.tv_sec;
1077
1078        return packet->payload ? htons(erfptr->rlen) :
1079                                erf_get_framing_length(packet);
1080}
1081
1082/* Attempts to read a packet from a DAG card in a NON-BLOCKING fashion. If a
1083 * packet is available, we will return a packet event. Otherwise we will
1084 * return a SLEEP event (as we cannot select on the DAG file descriptor).
1085 */
1086static libtrace_eventobj_t trace_event_dag(libtrace_t *trace,
1087                                        libtrace_packet_t *packet) {
1088        libtrace_eventobj_t event = {0,0,0.0,0};
1089        dag_record_t *erfptr = NULL;
1090        int numbytes;
1091        uint32_t flags = 0;
1092
1093        do {
1094                erfptr = NULL;
1095                numbytes = 0;
1096       
1097                /* Need to call dag_available so that the top pointer will get
1098                 * updated, otherwise we'll never see any data! */
1099                numbytes = dag_available(trace);
1100
1101                /* May as well not bother calling dag_get_record if
1102                 * dag_available suggests that there's no data */
1103                if (numbytes != 0)
1104                        erfptr = dag_get_record(trace);
1105                if (erfptr == NULL) {
1106                        /* No packet available - sleep for a very short time */
1107                        event.type = TRACE_EVENT_SLEEP;
1108                        event.seconds = 0.0001;
1109                        break;
1110                }
1111                if (dag_prepare_packet(trace, packet, erfptr, 
1112                                        TRACE_RT_DATA_ERF, flags)) {
1113                        event.type = TRACE_EVENT_TERMINATE;
1114                        break;
1115                }
1116
1117
1118                event.size = trace_get_capture_length(packet) + 
1119                                trace_get_framing_length(packet);
1120               
1121                /* XXX trace_read_packet() normally applies the following
1122                 * config options for us, but this function is called via
1123                 * trace_event() so we have to do it ourselves */
1124
1125                if (trace->filter) {
1126                        if (trace_apply_filter(trace->filter, packet)) {
1127                                event.type = TRACE_EVENT_PACKET;
1128                        } else {
1129                                /* This packet isn't useful so we want to
1130                                 * immediately see if there is another suitable
1131                                 * one - we definitely DO NOT want to return
1132                                 * a sleep event in this case, like we used to
1133                                 * do! */
1134                                continue;
1135                        }
1136                } else {
1137                        event.type = TRACE_EVENT_PACKET;
1138                }
1139
1140                if (trace->snaplen > 0) {
1141                        trace_set_capture_length(packet, trace->snaplen);
1142                }
1143                break;
1144        } while (1);
1145
1146        return event;
1147}
1148
1149/* Gets the number of dropped packets */
1150static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
1151        if (trace->format_data == NULL)
1152                return (uint64_t)-1;
1153        return DATA(trace)->drops;
1154}
1155
1156/* Prints some semi-useful help text about the DAG format module */
1157static void dag_help(void) {
1158        printf("dag format module: $Revision$\n");
1159        printf("Supported input URIs:\n");
1160        printf("\tdag:/dev/dagn\n");
1161        printf("\n");
1162        printf("\te.g.: dag:/dev/dag0\n");
1163        printf("\n");
1164        printf("Supported output URIs:\n");
1165        printf("\tnone\n");
1166        printf("\n");
1167}
1168
1169static struct libtrace_format_t dag = {
1170        "dag",
1171        "$Id$",
1172        TRACE_FORMAT_ERF,
1173        dag_probe_filename,             /* probe filename */
1174        NULL,                           /* probe magic */
1175        dag_init_input,                 /* init_input */
1176        dag_config_input,               /* config_input */
1177        dag_start_input,                /* start_input */
1178        dag_pause_input,                /* pause_input */
1179        dag_init_output,                /* init_output */
1180        NULL,                           /* config_output */
1181        dag_start_output,               /* start_output */
1182        dag_fin_input,                  /* fin_input */
1183        dag_fin_output,                 /* fin_output */
1184        dag_read_packet,                /* read_packet */
1185        dag_prepare_packet,             /* prepare_packet */
1186        NULL,                           /* fin_packet */
1187        dag_write_packet,               /* write_packet */
1188        erf_get_link_type,              /* get_link_type */
1189        erf_get_direction,              /* get_direction */
1190        erf_set_direction,              /* set_direction */
1191        erf_get_erf_timestamp,          /* get_erf_timestamp */
1192        NULL,                           /* get_timeval */
1193        NULL,                           /* get_seconds */
1194        NULL,                           /* get_timespec */
1195        NULL,                           /* seek_erf */
1196        NULL,                           /* seek_timeval */
1197        NULL,                           /* seek_seconds */
1198        erf_get_capture_length,         /* get_capture_length */
1199        erf_get_wire_length,            /* get_wire_length */
1200        erf_get_framing_length,         /* get_framing_length */
1201        erf_set_capture_length,         /* set_capture_length */
1202        NULL,                           /* get_received_packets */
1203        NULL,                           /* get_filtered_packets */
1204        dag_get_dropped_packets,        /* get_dropped_packets */
1205        NULL,                           /* get_captured_packets */
1206        NULL,                           /* get_fd */
1207        trace_event_dag,                /* trace_event */
1208        dag_help,                       /* help */
1209        NULL                            /* next pointer */
1210};
1211
1212void dag_constructor(void) {
1213        register_format(&dag);
1214}
Note: See TracBrowser for help on using the repository browser.