source: lib/format_dag25.c @ 1aa4bf7

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 1aa4bf7 was 1aa4bf7, checked in by Perry Lorier <perry@…>, 13 years ago

Support using timespec's for dealing with traces

  • Property mode set to 100644
File size: 27.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31#define _GNU_SOURCE
32
33#include "config.h"
34#include "common.h"
35#include "libtrace.h"
36#include "libtrace_int.h"
37#include "format_helper.h"
38#include "format_erf.h"
39
40#include <assert.h>
41#include <errno.h>
42#include <fcntl.h>
43#include <stdio.h>
44#include <string.h>
45#include <stdlib.h>
46
47#include <sys/mman.h>
48/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
49 * Windows anyway so we'll worry about this more later :] */
50#include <pthread.h>
51
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66
67#define DATA(x) ((struct dag_format_data_t *)x->format_data)
68#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
69
70#define FORMAT_DATA DATA(libtrace)
71#define FORMAT_DATA_OUT DATA_OUT(libtrace)
72
73#define DUCK FORMAT_DATA->duck
74static struct libtrace_format_t dag;
75
76struct dag_dev_t {
77        //pthread_mutex_t dag_mutex;
78        char * dev_name;
79        int fd;
80        uint16_t ref_count;
81        struct dag_dev_t *prev;
82        struct dag_dev_t *next;
83};
84
85struct dag_format_data_out_t {
86        struct dag_dev_t *device;
87        unsigned int dagstream;
88        int stream_attached;
89        uint8_t *bottom;
90        uint8_t *top;
91        uint32_t processed;
92        uint64_t drops;
93        uint64_t waiting;
94        uint8_t *txbuffer;
95};
96
97struct dag_format_data_t {
98        struct {
99                uint32_t last_duck;
100                uint32_t duck_freq;
101                uint32_t last_pkt;
102                libtrace_t *dummy_duck;
103        } duck;
104
105        struct dag_dev_t *device;
106        unsigned int dagstream;
107        int stream_attached;
108        uint8_t *bottom;
109        uint8_t *top;
110        uint32_t processed;
111        uint64_t drops;
112};
113
114pthread_mutex_t open_dag_mutex;
115struct dag_dev_t *open_dags = NULL;
116
117/* Dag erf ether packets have a 2 byte padding before the packet
118 * so that the ip header is aligned on a 32 bit boundary.
119 */
120static int dag_get_padding(const libtrace_packet_t *packet)
121{
122        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
123                dag_record_t *erfptr = (dag_record_t *)packet->header;
124                switch(erfptr->type) {
125                        case TYPE_ETH:
126                        case TYPE_DSM_COLOR_ETH:
127                                return 2;
128                        default:                return 0;
129                }
130        }
131        else {
132                switch(trace_get_link_type(packet)) {
133                        case TRACE_TYPE_ETH:    return 2;
134                        default:                return 0;
135                }
136        }
137}
138
139static int dag_probe_filename(const char *filename)
140{
141        struct stat statbuf;
142        /* Can we stat the file? */
143        if (stat(filename, &statbuf) != 0) {
144                return 0;
145        }
146        /* Is it a character device? */
147        if (!S_ISCHR(statbuf.st_mode)) {
148                return 0;
149        }
150        /* Yeah, it's probably us. */
151        return 1;
152}
153
154static void dag_init_format_out_data(libtrace_out_t *libtrace) {
155        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
156        // no DUCK on output
157        FORMAT_DATA_OUT->stream_attached = 0;
158        FORMAT_DATA_OUT->drops = 0;
159        FORMAT_DATA_OUT->device = NULL;
160        FORMAT_DATA_OUT->dagstream = 0;
161        FORMAT_DATA_OUT->processed = 0;
162        FORMAT_DATA_OUT->bottom = NULL;
163        FORMAT_DATA_OUT->top = NULL;
164        FORMAT_DATA_OUT->waiting = 0;
165
166}
167
168static void dag_init_format_data(libtrace_t *libtrace) {
169        libtrace->format_data = (struct dag_format_data_t *)
170                malloc(sizeof(struct dag_format_data_t));
171        DUCK.last_duck = 0;
172        DUCK.duck_freq = 0;
173        DUCK.last_pkt = 0;
174        DUCK.dummy_duck = NULL;
175        FORMAT_DATA->stream_attached = 0;
176        FORMAT_DATA->drops = 0;
177        FORMAT_DATA->device = NULL;
178        FORMAT_DATA->dagstream = 0;
179        FORMAT_DATA->processed = 0;
180        FORMAT_DATA->bottom = NULL;
181        FORMAT_DATA->top = NULL;
182}
183
184/* NOTE: This function assumes the open_dag_mutex is held by the caller */
185static struct dag_dev_t *dag_find_open_device(char *dev_name) {
186        struct dag_dev_t *dag_dev;
187
188        dag_dev = open_dags;
189
190        /* XXX: Not exactly zippy, but how often are we going to be dealing
191         * with multiple dag cards? */
192        while (dag_dev != NULL) {
193                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
194                        dag_dev->ref_count ++;
195                        return dag_dev;
196
197                }
198                dag_dev = dag_dev->next;
199        }
200        return NULL;
201
202
203}
204
205/* NOTE: This function assumes the open_dag_mutex is held by the caller */
206static void dag_close_device(struct dag_dev_t *dev) {
207        /* Need to remove from the device list */
208
209        assert(dev->ref_count == 0);
210
211        if (dev->prev == NULL) {
212                open_dags = dev->next;
213                if (dev->next)
214                        dev->next->prev = NULL;
215        } else {
216                dev->prev->next = dev->next;
217                if (dev->next)
218                        dev->next->prev = dev->prev;
219        }
220
221        dag_close(dev->fd);
222        if (dev->dev_name)
223        free(dev->dev_name);
224        free(dev);
225}
226
227static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
228        struct stat buf;
229        int fd;
230        struct dag_dev_t *new_dev;
231
232        if (stat(dev_name, &buf) == -1) {
233                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
234                return NULL;
235}
236
237        if (S_ISCHR(buf.st_mode)) {
238                if((fd = dag_open(dev_name)) < 0) {
239                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
240                                        dev_name);
241                        return NULL;
242                }
243        } else {
244                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
245                                dev_name);
246                return NULL;
247        }
248
249        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
250        new_dev->fd = fd;
251        new_dev->dev_name = dev_name;
252        new_dev->ref_count = 1;
253
254        new_dev->prev = NULL;
255        new_dev->next = open_dags;
256        if (open_dags)
257                open_dags->prev = new_dev;
258
259        open_dags = new_dev;
260
261        return new_dev;
262}
263
264/* NOTE: This function assumes the open_dag_mutex is held by the caller */
265static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
266        struct stat buf;
267        int fd;
268        struct dag_dev_t *new_dev;
269
270        if (stat(dev_name, &buf) == -1) {
271                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
272                return NULL;
273        }
274
275        if (S_ISCHR(buf.st_mode)) {
276                if((fd = dag_open(dev_name)) < 0) {
277                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
278                                        dev_name);
279                        return NULL;
280                }
281        } else {
282                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
283                                dev_name);
284                return NULL;
285        }
286
287        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
288        new_dev->fd = fd;
289        new_dev->dev_name = dev_name;
290        new_dev->ref_count = 1;
291
292        new_dev->prev = NULL;
293        new_dev->next = open_dags;
294        if (open_dags)
295                open_dags->prev = new_dev;
296
297        open_dags = new_dev;
298
299        return new_dev;
300}
301
302static int dag_init_output(libtrace_out_t *libtrace) {
303        char *dag_dev_name = NULL;
304        char *scan = NULL;
305        struct dag_dev_t *dag_device = NULL;
306        int stream = 1;
307        unsigned long wake_time;
308        dagutil_sleep_get_wake_time(&wake_time,0);
309
310        dag_init_format_out_data(libtrace);
311        pthread_mutex_lock(&open_dag_mutex);
312        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
313                dag_dev_name = strdup(libtrace->uridata);
314        } else {
315                dag_dev_name = (char *)strndup(libtrace->uridata,
316                                (size_t)(scan - libtrace->uridata));
317                stream = atoi(++scan);
318        }
319        FORMAT_DATA_OUT->dagstream = stream;
320
321        dag_device = dag_find_open_device(dag_dev_name);
322
323        if (dag_device == NULL) {
324                /* Device not yet opened - open it ourselves */
325                dag_device = dag_open_output_device(libtrace, dag_dev_name);
326        } else {
327                free(dag_dev_name);
328                dag_dev_name = NULL;
329        }
330
331        if (dag_device == NULL) {
332                if (dag_dev_name) {
333                        free(dag_dev_name);
334                }
335                return -1;
336        }
337
338        FORMAT_DATA_OUT->device = dag_device;
339        pthread_mutex_unlock(&open_dag_mutex);
340        return 0;
341}
342
343static int dag_init_input(libtrace_t *libtrace) {
344        char *dag_dev_name = NULL;
345        char *scan = NULL;
346        int stream = 0;
347        struct dag_dev_t *dag_device = NULL;
348
349        dag_init_format_data(libtrace);
350        pthread_mutex_lock(&open_dag_mutex);
351        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
352                dag_dev_name = strdup(libtrace->uridata);
353        } else {
354                dag_dev_name = (char *)strndup(libtrace->uridata,
355                                (size_t)(scan - libtrace->uridata));
356                stream = atoi(++scan);
357        }
358
359        FORMAT_DATA->dagstream = stream;
360
361        dag_device = dag_find_open_device(dag_dev_name);
362
363        if (dag_device == NULL) {
364                /* Device not yet opened - open it ourselves */
365                dag_device = dag_open_device(libtrace, dag_dev_name);
366        } else {
367                free(dag_dev_name);
368                dag_dev_name = NULL;
369        }
370
371        if (dag_device == NULL) {
372                if (dag_dev_name)
373                        free(dag_dev_name);
374                dag_dev_name = NULL;
375                return -1;
376        }
377
378        FORMAT_DATA->device = dag_device;
379
380        pthread_mutex_unlock(&open_dag_mutex);
381        return 0;
382}
383
384static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
385                                void *data) {
386        char conf_str[4096];
387        switch(option) {
388                case TRACE_OPTION_META_FREQ:
389                        DUCK.duck_freq = *(int *)data;
390                        return 0;
391                case TRACE_OPTION_SNAPLEN:
392                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
393                        if (dag_configure(FORMAT_DATA->device->fd,
394                                                conf_str) != 0) {
395                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
396                                return -1;
397                        }
398                        return 0;
399                case TRACE_OPTION_PROMISC:
400                        /* DAG already operates in a promisc fashion */
401                        return -1;
402                case TRACE_OPTION_FILTER:
403                        return -1;
404                case TRACE_OPTION_EVENT_REALTIME:
405                        return -1;
406        }
407        return -1;
408}
409static int dag_start_output(libtrace_out_t *libtrace) {
410        struct timeval zero, nopoll;
411        uint8_t *top, *bottom;
412        top = bottom = NULL;
413
414        zero.tv_sec = 0;
415        zero.tv_usec = 0;
416        nopoll = zero;
417
418        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
419                        FORMAT_DATA_OUT->dagstream, 0, 1048576) < 0) {
420                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
421                return -1;
422        }
423
424        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
425                        FORMAT_DATA_OUT->dagstream) < 0) {
426                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
427                return -1;
428        }
429        FORMAT_DATA_OUT->stream_attached = 1;
430
431        /* We don't want the dag card to do any sleeping */
432
433        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
434                        FORMAT_DATA_OUT->dagstream, 0, &zero,
435                        &nopoll);
436
437        return 0;
438}
439
440static int dag_start_input(libtrace_t *libtrace) {
441        struct timeval zero, nopoll;
442        uint8_t *top, *bottom;
443        uint8_t diff = 0;
444        top = bottom = NULL;
445
446        zero.tv_sec = 0;
447        zero.tv_usec = 0;
448        nopoll = zero;
449
450
451
452        if (dag_attach_stream(FORMAT_DATA->device->fd,
453                                FORMAT_DATA->dagstream, 0, 0) < 0) {
454                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
455                return -1;
456        }
457
458        if (dag_start_stream(FORMAT_DATA->device->fd,
459                                FORMAT_DATA->dagstream) < 0) {
460                trace_set_err(libtrace, errno, "Cannot start DAG stream");
461                return -1;
462        }
463        FORMAT_DATA->stream_attached = 1;
464        /* We don't want the dag card to do any sleeping */
465        dag_set_stream_poll(FORMAT_DATA->device->fd,
466                                FORMAT_DATA->dagstream, 0, &zero,
467                                &nopoll);
468
469        /* Should probably flush the memory hole now */
470
471        do {
472                top = dag_advance_stream(FORMAT_DATA->device->fd,
473                                        FORMAT_DATA->dagstream,
474                                        &bottom);
475                assert(top && bottom);
476                diff = top - bottom;
477                bottom -= diff;
478        } while (diff != 0);
479        FORMAT_DATA->top = NULL;
480        FORMAT_DATA->bottom = NULL;
481        FORMAT_DATA->processed = 0;
482        FORMAT_DATA->drops = 0;
483
484        return 0;
485}
486
487static int dag_pause_output(libtrace_out_t *libtrace) {
488        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
489                        FORMAT_DATA_OUT->dagstream) < 0) {
490                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
491                return -1;
492        }
493        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
494                        FORMAT_DATA_OUT->dagstream) < 0) {
495                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
496                return -1;
497        }
498        FORMAT_DATA_OUT->stream_attached = 0;
499        return 0;
500}
501
502static int dag_pause_input(libtrace_t *libtrace) {
503        if (dag_stop_stream(FORMAT_DATA->device->fd,
504                                FORMAT_DATA->dagstream) < 0) {
505                trace_set_err(libtrace, errno, "Could not stop DAG stream");
506                return -1;
507        }
508        if (dag_detach_stream(FORMAT_DATA->device->fd,
509                                FORMAT_DATA->dagstream) < 0) {
510                trace_set_err(libtrace, errno, "Could not detach DAG stream");
511                return -1;
512        }
513        FORMAT_DATA->stream_attached = 0;
514        return 0;
515}
516
517static int dag_fin_input(libtrace_t *libtrace) {
518        pthread_mutex_lock(&open_dag_mutex);
519        if (FORMAT_DATA->stream_attached)
520                dag_pause_input(libtrace);
521        FORMAT_DATA->device->ref_count --;
522
523        if (FORMAT_DATA->device->ref_count == 0)
524                dag_close_device(FORMAT_DATA->device);
525        if (DUCK.dummy_duck)
526                trace_destroy_dead(DUCK.dummy_duck);
527        free(libtrace->format_data);
528        pthread_mutex_unlock(&open_dag_mutex);
529        return 0; /* success */
530}
531
532static int dag_fin_output(libtrace_out_t *libtrace) {
533        // commit any outstanding traffic in the txbuffer
534        if (FORMAT_DATA_OUT->waiting) {
535                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
536                                FORMAT_DATA_OUT->waiting );
537        }
538
539        // wait until the buffer is nearly clear before exiting the program, as we
540        // will lose packets otherwise
541        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
542                        FORMAT_DATA_OUT->dagstream,
543                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
544                                        FORMAT_DATA_OUT->dagstream) - 8
545                        );
546
547        pthread_mutex_lock(&open_dag_mutex);
548        if (FORMAT_DATA_OUT->stream_attached)
549                dag_pause_output(libtrace);
550        FORMAT_DATA_OUT->device->ref_count --;
551
552        if (FORMAT_DATA_OUT->device->ref_count == 0)
553                dag_close_device(FORMAT_DATA_OUT->device);
554        free(libtrace->format_data);
555        pthread_mutex_unlock(&open_dag_mutex);
556        return 0; /* success */
557}
558
559static int dag_get_duckinfo(libtrace_t *libtrace,
560                                libtrace_packet_t *packet) {
561        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
562                        !packet->buffer) {
563                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
564                packet->buf_control = TRACE_CTRL_PACKET;
565                if (!packet->buffer) {
566                        trace_set_err(libtrace, errno,
567                                        "Cannot allocate packet buffer");
568                        return -1;
569                }
570        }
571
572        packet->header = 0;
573        packet->payload = packet->buffer;
574
575        /* No need to check if we can get DUCK or not - we're modern
576         * enough */
577        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
578                                        (duckinf_t *)packet->payload) < 0)) {
579                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
580                return -1;
581        }
582
583        packet->type = TRACE_RT_DUCK_2_5;
584        if (!DUCK.dummy_duck)
585                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
586        packet->trace = DUCK.dummy_duck;
587        return sizeof(duckinf_t);
588}
589
590static int dag_available(libtrace_t *libtrace) {
591        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
592        /* If we've processed more than 4MB of data since we last called
593         * dag_advance_stream, then we should call it again to allow the
594         * space occupied by that 4MB to be released */
595        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
596                return diff;
597        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
598                        FORMAT_DATA->dagstream,
599                        &(FORMAT_DATA->bottom));
600        if (FORMAT_DATA->top == NULL) {
601                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
602                return -1;
603        }
604        FORMAT_DATA->processed = 0;
605        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
606        return diff;
607}
608
609static dag_record_t *dag_get_record(libtrace_t *libtrace) {
610        dag_record_t *erfptr = NULL;
611        uint16_t size;
612        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
613        if (!erfptr)
614                return NULL;
615        size = ntohs(erfptr->rlen);
616        assert( size >= dag_record_size );
617        /* Make certain we have the full packet available */
618        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
619                return NULL;
620        FORMAT_DATA->bottom += size;
621        FORMAT_DATA->processed += size;
622        return erfptr;
623}
624
625static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
626                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
627
628        dag_record_t *erfptr;
629
630        if (packet->buffer != buffer &&
631                        packet->buf_control == TRACE_CTRL_PACKET) {
632                free(packet->buffer);
633        }
634
635        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
636                packet->buf_control = TRACE_CTRL_PACKET;
637        } else
638                packet->buf_control = TRACE_CTRL_EXTERNAL;
639
640        erfptr = (dag_record_t *)buffer;
641        packet->buffer = erfptr;
642        packet->header = erfptr;
643        packet->type = rt_type;
644
645        if (erfptr->flags.rxerror == 1) {
646                /* rxerror means the payload is corrupt - drop it
647                 * by tweaking rlen */
648                packet->payload = NULL;
649                erfptr->rlen = htons(erf_get_framing_length(packet));
650        } else {
651                packet->payload = (char*)packet->buffer
652                        + erf_get_framing_length(packet);
653        }
654
655        if (libtrace->format_data == NULL) {
656                dag_init_format_data(libtrace);
657        }
658
659        /* No loss counter for DSM coloured records - have to use
660         * some other API */
661        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
662
663        } else {
664                DATA(libtrace)->drops += ntohs(erfptr->lctr);
665        }
666
667        return 0;
668}
669
670/*
671 * dag_write_packet() at this stage attempts to improve tx performance
672 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
673 * has been met. I observed approximately 270% performance increase
674 * through this relatively naive tweak. No optimisation of buffer sizes
675 * was attempted.
676 */
677
678static int dag_dump_packet(libtrace_out_t *libtrace,
679                dag_record_t *erfptr, unsigned int pad, void *buffer) {
680        //int numbytes = 0;
681        int size;
682
683        /*
684         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
685         * requested any space yet, and request some, storing the pointer at
686         * FORMAT_DATA_OUT->txbuffer.
687         *
688         * The amount to request is slightly magical at the moment - it's
689         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
690         * the buffer and handle overruns.
691         */
692        if (FORMAT_DATA_OUT->waiting == 0) {
693                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
694                                FORMAT_DATA_OUT->dagstream, 16908288);
695        }
696
697        /*
698         * Copy the header separately to the body, as we can't guarantee they are
699         * in contiguous memory
700         */
701        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
702        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
703
704
705
706        /*
707         * Copy our incoming packet into the outgoing buffer, and increment our waiting count
708         */
709        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
710        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
711        FORMAT_DATA_OUT->waiting += size;
712
713        /*
714         * if our output buffer has more than 16 Mebibytes in it, commit those bytes and
715         * reset the waiting count to 0.
716         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in case
717         * there is still data in the buffer at program exit.
718         */
719
720        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
721                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
722                        FORMAT_DATA_OUT->waiting );
723                FORMAT_DATA_OUT->waiting = 0;
724        }
725
726        return size + pad + dag_record_size;
727
728}
729
730static bool find_compatible_linktype(libtrace_out_t *libtrace,
731                                libtrace_packet_t *packet)
732{
733         // Keep trying to simplify the packet until we can find
734         //something we can do with it
735
736        do {
737                char type=libtrace_to_erf_type(trace_get_link_type(packet));
738
739                // Success
740                if (type != (char)-1)
741                        return true;
742
743                if (!demote_packet(packet)) {
744                        trace_set_err_out(libtrace,
745                                        TRACE_ERR_NO_CONVERSION,
746                                        "No erf type for packet (%i)",
747                                        trace_get_link_type(packet));
748                        return false;
749                }
750
751        } while(1);
752
753        return true;
754}
755
756static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
757        /*
758         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding sucks,
759         * sorry about that.
760         */
761        unsigned int pad = 0;
762        int numbytes;
763        void *payload = packet->payload;
764        dag_record_t *header = (dag_record_t *)packet->header;
765
766        FORMAT_DATA_OUT->processed ++;
767        if(!packet->header) {
768                // No header, probably an RT packet. Lifted from erf_write_packet().
769                return -1;
770        }
771
772        pad = dag_get_padding(packet);
773
774        /*
775         * if the payload is null, adjust the rlen. Discussion of this is
776         * attached to erf_write_packet()
777         */
778        if (payload == NULL) {
779                header->rlen = htons(dag_record_size + pad);
780        }
781
782        if (packet->type == TRACE_RT_DATA_ERF) {
783                numbytes = dag_dump_packet(libtrace,
784                                header,
785                                pad,
786                                payload
787                                );
788
789        } else {
790                /* Build up a new packet header from the existing header */
791
792                // Simplify the packet first - if we can't do this, break early
793                if (!find_compatible_linktype(libtrace,packet))
794                        return -1;
795
796                dag_record_t erfhdr;
797
798                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
799                payload=packet->payload;
800                pad = dag_get_padding(packet);
801
802                /* Flags. Can't do this */
803                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
804                if (trace_get_direction(packet)!=~0U)
805                        erfhdr.flags.iface = trace_get_direction(packet);
806
807                erfhdr.type = libtrace_to_erf_type(trace_get_link_type(packet));
808
809                /* Packet length (rlen includes format overhead) */
810                assert(trace_get_capture_length(packet)>0
811                                && trace_get_capture_length(packet)<=65536);
812                assert(erf_get_framing_length(packet)>0
813                                && trace_get_framing_length(packet)<=65536);
814                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
815                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
816
817                erfhdr.rlen = htons(trace_get_capture_length(packet)
818                        + erf_get_framing_length(packet));
819
820
821                /* loss counter. Can't do this */
822                erfhdr.lctr = 0;
823                /* Wire length, does not include padding! */
824                erfhdr.wlen = htons(trace_get_wire_length(packet));
825
826                /* Write it out */
827                numbytes = dag_dump_packet(libtrace,
828                                &erfhdr,
829                                pad,
830                                payload);
831
832        }
833
834        return numbytes;
835}
836
837static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
838        int size = 0;
839        struct timeval tv;
840        dag_record_t *erfptr = NULL;
841        int numbytes = 0;
842        uint32_t flags = 0;
843
844        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
845                        DUCK.duck_freq != 0) {
846                size = dag_get_duckinfo(libtrace, packet);
847                DUCK.last_duck = DUCK.last_pkt;
848                if (size != 0) {
849                        return size;
850                }
851                /* No DUCK support, so don't waste our time anymore */
852                DUCK.duck_freq = 0;
853        }
854
855        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
856
857        if (packet->buf_control == TRACE_CTRL_PACKET) {
858                free(packet->buffer);
859                packet->buffer = 0;
860        }
861
862        do {
863                numbytes = dag_available(libtrace);
864                if (numbytes < 0)
865                        return numbytes;
866                if (numbytes < dag_record_size)
867                        /* Block until we see a packet */
868                        continue;
869                erfptr = dag_get_record(libtrace);
870        } while (erfptr == NULL);
871
872        //dag_form_packet(erfptr, packet);
873        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
874                                flags))
875                return -1;
876        tv = trace_get_timeval(packet);
877        DUCK.last_pkt = tv.tv_sec;
878
879        return packet->payload ? htons(erfptr->rlen) :
880                                erf_get_framing_length(packet);
881}
882
883static libtrace_eventobj_t trace_event_dag(libtrace_t *trace,
884                                        libtrace_packet_t *packet) {
885        libtrace_eventobj_t event = {0,0,0.0,0};
886        dag_record_t *erfptr = NULL;
887        int numbytes;
888        uint32_t flags = 0;
889
890        /* Need to call dag_available so that the top pointer will get
891         * updated, otherwise we'll never see any data! */
892        numbytes = dag_available(trace);
893
894        /* May as well not bother calling dag_get_record if dag_available
895         * suggests that there's no data */
896        if (numbytes != 0)
897                erfptr = dag_get_record(trace);
898        if (erfptr == NULL) {
899                /* No packet available */
900                event.type = TRACE_EVENT_SLEEP;
901                event.seconds = 0.0001;
902                return event;
903        }
904        //dag_form_packet(erfptr, packet);
905        if (dag_prepare_packet(trace, packet, erfptr, TRACE_RT_DATA_ERF,
906                                flags)) {
907                event.type = TRACE_EVENT_TERMINATE;
908                return event;
909        }
910
911
912        event.size = trace_get_capture_length(packet) + trace_get_framing_length(packet);
913        if (trace->filter) {
914                if (trace_apply_filter(trace->filter, packet)) {
915                        event.type = TRACE_EVENT_PACKET;
916                } else {
917                        event.type = TRACE_EVENT_SLEEP;
918                        event.seconds = 0.000001;
919                        return event;
920                }
921        } else {
922                event.type = TRACE_EVENT_PACKET;
923        }
924
925        if (trace->snaplen > 0) {
926                trace_set_capture_length(packet, trace->snaplen);
927        }
928
929        return event;
930}
931
932static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
933        if (trace->format_data == NULL)
934                return (uint64_t)-1;
935        return DATA(trace)->drops;
936}
937
938static void dag_help(void) {
939        printf("dag format module: $Revision$\n");
940        printf("Supported input URIs:\n");
941        printf("\tdag:/dev/dagn\n");
942        printf("\n");
943        printf("\te.g.: dag:/dev/dag0\n");
944        printf("\n");
945        printf("Supported output URIs:\n");
946        printf("\tnone\n");
947        printf("\n");
948}
949
950static struct libtrace_format_t dag = {
951        "dag",
952        "$Id$",
953        TRACE_FORMAT_ERF,
954        dag_probe_filename,             /* probe filename */
955        NULL,                           /* probe magic */
956        dag_init_input,                 /* init_input */
957        dag_config_input,               /* config_input */
958        dag_start_input,                /* start_input */
959        dag_pause_input,                /* pause_input */
960        dag_init_output,                /* init_output */
961        NULL,                           /* config_output */
962        dag_start_output,               /* start_output */
963        dag_fin_input,                  /* fin_input */
964        dag_fin_output,                 /* fin_output */
965        dag_read_packet,                /* read_packet */
966        dag_prepare_packet,             /* prepare_packet */
967        NULL,                           /* fin_packet */
968        dag_write_packet,               /* write_packet */
969        erf_get_link_type,              /* get_link_type */
970        erf_get_direction,              /* get_direction */
971        erf_set_direction,              /* set_direction */
972        erf_get_erf_timestamp,          /* get_erf_timestamp */
973        NULL,                           /* get_timeval */
974        NULL,                           /* get_seconds */
975        NULL,                           /* get_timespec */
976        NULL,                           /* seek_erf */
977        NULL,                           /* seek_timeval */
978        NULL,                           /* seek_seconds */
979        erf_get_capture_length,         /* get_capture_length */
980        erf_get_wire_length,            /* get_wire_length */
981        erf_get_framing_length,         /* get_framing_length */
982        erf_set_capture_length,         /* set_capture_length */
983        NULL,                           /* get_received_packets */
984        NULL,                           /* get_filtered_packets */
985        dag_get_dropped_packets,        /* get_dropped_packets */
986        NULL,                           /* get_captured_packets */
987        NULL,                           /* get_fd */
988        trace_event_dag,                /* trace_event */
989        dag_help,                       /* help */
990        NULL                            /* next pointer */
991};
992
993void dag_constructor(void) {
994        register_format(&dag);
995}
Note: See TracBrowser for help on using the repository browser.