source: lib/format_dag25.c @ cce868c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since cce868c was cce868c, checked in by Shane Alcock <salcock@…>, 11 years ago
  • Fixed warning about implicit function declaration - the function does not appear to be present in all the driver releases we are trying to cover
  • Property mode set to 100644
File size: 27.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31#define _GNU_SOURCE
32
33#include "config.h"
34#include "common.h"
35#include "libtrace.h"
36#include "libtrace_int.h"
37#include "format_helper.h"
38#include "format_erf.h"
39
40#include <assert.h>
41#include <errno.h>
42#include <fcntl.h>
43#include <stdio.h>
44#include <string.h>
45#include <stdlib.h>
46
47#include <sys/mman.h>
48/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
49 * Windows anyway so we'll worry about this more later :] */
50#include <pthread.h>
51
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66
67#define DATA(x) ((struct dag_format_data_t *)x->format_data)
68#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
69
70#define FORMAT_DATA DATA(libtrace)
71#define FORMAT_DATA_OUT DATA_OUT(libtrace)
72
73#define DUCK FORMAT_DATA->duck
74static struct libtrace_format_t dag;
75
76struct dag_dev_t {
77        //pthread_mutex_t dag_mutex;
78        char * dev_name;
79        int fd;
80        uint16_t ref_count;
81        struct dag_dev_t *prev;
82        struct dag_dev_t *next;
83};
84
85struct dag_format_data_out_t {
86        struct dag_dev_t *device;
87        unsigned int dagstream;
88        int stream_attached;
89        uint8_t *bottom;
90        uint8_t *top;
91        uint32_t processed;
92        uint64_t drops;
93        uint64_t waiting;
94        uint8_t *txbuffer;
95};
96
97struct dag_format_data_t {
98        struct {
99                uint32_t last_duck;
100                uint32_t duck_freq;
101                uint32_t last_pkt;
102                libtrace_t *dummy_duck;
103        } duck;
104
105        struct dag_dev_t *device;
106        unsigned int dagstream;
107        int stream_attached;
108        uint8_t *bottom;
109        uint8_t *top;
110        uint32_t processed;
111        uint64_t drops;
112};
113
114pthread_mutex_t open_dag_mutex;
115struct dag_dev_t *open_dags = NULL;
116
117/* Dag erf ether packets have a 2 byte padding before the packet
118 * so that the ip header is aligned on a 32 bit boundary.
119 */
120static int dag_get_padding(const libtrace_packet_t *packet)
121{
122        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
123                dag_record_t *erfptr = (dag_record_t *)packet->header;
124                switch(erfptr->type) {
125                        case TYPE_ETH:
126                        case TYPE_DSM_COLOR_ETH:
127                                return 2;
128                        default:                return 0;
129                }
130        }
131        else {
132                switch(trace_get_link_type(packet)) {
133                        case TRACE_TYPE_ETH:    return 2;
134                        default:                return 0;
135                }
136        }
137}
138
139static int dag_probe_filename(const char *filename)
140{
141        struct stat statbuf;
142        /* Can we stat the file? */
143        if (stat(filename, &statbuf) != 0) {
144                return 0;
145        }
146        /* Is it a character device? */
147        if (!S_ISCHR(statbuf.st_mode)) {
148                return 0;
149        }
150        /* Yeah, it's probably us. */
151        return 1;
152}
153
154static void dag_init_format_out_data(libtrace_out_t *libtrace) {
155        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
156        // no DUCK on output
157        FORMAT_DATA_OUT->stream_attached = 0;
158        FORMAT_DATA_OUT->drops = 0;
159        FORMAT_DATA_OUT->device = NULL;
160        FORMAT_DATA_OUT->dagstream = 0;
161        FORMAT_DATA_OUT->processed = 0;
162        FORMAT_DATA_OUT->bottom = NULL;
163        FORMAT_DATA_OUT->top = NULL;
164        FORMAT_DATA_OUT->waiting = 0;
165
166}
167
168static void dag_init_format_data(libtrace_t *libtrace) {
169        libtrace->format_data = (struct dag_format_data_t *)
170                malloc(sizeof(struct dag_format_data_t));
171        DUCK.last_duck = 0;
172        DUCK.duck_freq = 0;
173        DUCK.last_pkt = 0;
174        DUCK.dummy_duck = NULL;
175        FORMAT_DATA->stream_attached = 0;
176        FORMAT_DATA->drops = 0;
177        FORMAT_DATA->device = NULL;
178        FORMAT_DATA->dagstream = 0;
179        FORMAT_DATA->processed = 0;
180        FORMAT_DATA->bottom = NULL;
181        FORMAT_DATA->top = NULL;
182}
183
184/* NOTE: This function assumes the open_dag_mutex is held by the caller */
185static struct dag_dev_t *dag_find_open_device(char *dev_name) {
186        struct dag_dev_t *dag_dev;
187
188        dag_dev = open_dags;
189
190        /* XXX: Not exactly zippy, but how often are we going to be dealing
191         * with multiple dag cards? */
192        while (dag_dev != NULL) {
193                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
194                        dag_dev->ref_count ++;
195                        return dag_dev;
196
197                }
198                dag_dev = dag_dev->next;
199        }
200        return NULL;
201
202
203}
204
205/* NOTE: This function assumes the open_dag_mutex is held by the caller */
206static void dag_close_device(struct dag_dev_t *dev) {
207        /* Need to remove from the device list */
208
209        assert(dev->ref_count == 0);
210
211        if (dev->prev == NULL) {
212                open_dags = dev->next;
213                if (dev->next)
214                        dev->next->prev = NULL;
215        } else {
216                dev->prev->next = dev->next;
217                if (dev->next)
218                        dev->next->prev = dev->prev;
219        }
220
221        dag_close(dev->fd);
222        if (dev->dev_name)
223        free(dev->dev_name);
224        free(dev);
225}
226
227static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
228        struct stat buf;
229        int fd;
230        struct dag_dev_t *new_dev;
231
232        if (stat(dev_name, &buf) == -1) {
233                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
234                return NULL;
235}
236
237        if (S_ISCHR(buf.st_mode)) {
238                if((fd = dag_open(dev_name)) < 0) {
239                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
240                                        dev_name);
241                        return NULL;
242                }
243        } else {
244                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
245                                dev_name);
246                return NULL;
247        }
248
249        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
250        new_dev->fd = fd;
251        new_dev->dev_name = dev_name;
252        new_dev->ref_count = 1;
253
254        new_dev->prev = NULL;
255        new_dev->next = open_dags;
256        if (open_dags)
257                open_dags->prev = new_dev;
258
259        open_dags = new_dev;
260
261        return new_dev;
262}
263
264/* NOTE: This function assumes the open_dag_mutex is held by the caller */
265static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
266        struct stat buf;
267        int fd;
268        struct dag_dev_t *new_dev;
269
270        if (stat(dev_name, &buf) == -1) {
271                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
272                return NULL;
273        }
274
275        if (S_ISCHR(buf.st_mode)) {
276                if((fd = dag_open(dev_name)) < 0) {
277                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
278                                        dev_name);
279                        return NULL;
280                }
281        } else {
282                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
283                                dev_name);
284                return NULL;
285        }
286
287        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
288        new_dev->fd = fd;
289        new_dev->dev_name = dev_name;
290        new_dev->ref_count = 1;
291
292        new_dev->prev = NULL;
293        new_dev->next = open_dags;
294        if (open_dags)
295                open_dags->prev = new_dev;
296
297        open_dags = new_dev;
298
299        return new_dev;
300}
301
302static int dag_init_output(libtrace_out_t *libtrace) {
303        char *dag_dev_name = NULL;
304        char *scan = NULL;
305        struct dag_dev_t *dag_device = NULL;
306        int stream = 1;
307       
308        /* XXX I don't know if this is important or not, but this function
309         * isn't present in all of the driver releases that this code is
310         * supposed to support! */
311        /*
312        unsigned long wake_time;
313        dagutil_sleep_get_wake_time(&wake_time,0);
314        */
315
316        dag_init_format_out_data(libtrace);
317        pthread_mutex_lock(&open_dag_mutex);
318        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
319                dag_dev_name = strdup(libtrace->uridata);
320        } else {
321                dag_dev_name = (char *)strndup(libtrace->uridata,
322                                (size_t)(scan - libtrace->uridata));
323                stream = atoi(++scan);
324        }
325        FORMAT_DATA_OUT->dagstream = stream;
326
327        dag_device = dag_find_open_device(dag_dev_name);
328
329        if (dag_device == NULL) {
330                /* Device not yet opened - open it ourselves */
331                dag_device = dag_open_output_device(libtrace, dag_dev_name);
332        } else {
333                free(dag_dev_name);
334                dag_dev_name = NULL;
335        }
336
337        if (dag_device == NULL) {
338                if (dag_dev_name) {
339                        free(dag_dev_name);
340                }
341                return -1;
342        }
343
344        FORMAT_DATA_OUT->device = dag_device;
345        pthread_mutex_unlock(&open_dag_mutex);
346        return 0;
347}
348
349static int dag_init_input(libtrace_t *libtrace) {
350        char *dag_dev_name = NULL;
351        char *scan = NULL;
352        int stream = 0;
353        struct dag_dev_t *dag_device = NULL;
354
355        dag_init_format_data(libtrace);
356        pthread_mutex_lock(&open_dag_mutex);
357        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
358                dag_dev_name = strdup(libtrace->uridata);
359        } else {
360                dag_dev_name = (char *)strndup(libtrace->uridata,
361                                (size_t)(scan - libtrace->uridata));
362                stream = atoi(++scan);
363        }
364
365        FORMAT_DATA->dagstream = stream;
366
367        dag_device = dag_find_open_device(dag_dev_name);
368
369        if (dag_device == NULL) {
370                /* Device not yet opened - open it ourselves */
371                dag_device = dag_open_device(libtrace, dag_dev_name);
372        } else {
373                free(dag_dev_name);
374                dag_dev_name = NULL;
375        }
376
377        if (dag_device == NULL) {
378                if (dag_dev_name)
379                        free(dag_dev_name);
380                dag_dev_name = NULL;
381                return -1;
382        }
383
384        FORMAT_DATA->device = dag_device;
385
386        pthread_mutex_unlock(&open_dag_mutex);
387        return 0;
388}
389
390static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
391                                void *data) {
392        char conf_str[4096];
393        switch(option) {
394                case TRACE_OPTION_META_FREQ:
395                        DUCK.duck_freq = *(int *)data;
396                        return 0;
397                case TRACE_OPTION_SNAPLEN:
398                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
399                        if (dag_configure(FORMAT_DATA->device->fd,
400                                                conf_str) != 0) {
401                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
402                                return -1;
403                        }
404                        return 0;
405                case TRACE_OPTION_PROMISC:
406                        /* DAG already operates in a promisc fashion */
407                        return -1;
408                case TRACE_OPTION_FILTER:
409                        return -1;
410                case TRACE_OPTION_EVENT_REALTIME:
411                        return -1;
412        }
413        return -1;
414}
415static int dag_start_output(libtrace_out_t *libtrace) {
416        struct timeval zero, nopoll;
417        uint8_t *top, *bottom;
418        top = bottom = NULL;
419
420        zero.tv_sec = 0;
421        zero.tv_usec = 0;
422        nopoll = zero;
423
424        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
425                        FORMAT_DATA_OUT->dagstream, 0, 1048576) < 0) {
426                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
427                return -1;
428        }
429
430        if (dag_start_stream(FORMAT_DATA_OUT->device->fd,
431                        FORMAT_DATA_OUT->dagstream) < 0) {
432                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
433                return -1;
434        }
435        FORMAT_DATA_OUT->stream_attached = 1;
436
437        /* We don't want the dag card to do any sleeping */
438
439        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
440                        FORMAT_DATA_OUT->dagstream, 0, &zero,
441                        &nopoll);
442
443        return 0;
444}
445
446static int dag_start_input(libtrace_t *libtrace) {
447        struct timeval zero, nopoll;
448        uint8_t *top, *bottom;
449        uint8_t diff = 0;
450        top = bottom = NULL;
451
452        zero.tv_sec = 0;
453        zero.tv_usec = 0;
454        nopoll = zero;
455
456
457
458        if (dag_attach_stream(FORMAT_DATA->device->fd,
459                                FORMAT_DATA->dagstream, 0, 0) < 0) {
460                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
461                return -1;
462        }
463
464        if (dag_start_stream(FORMAT_DATA->device->fd,
465                                FORMAT_DATA->dagstream) < 0) {
466                trace_set_err(libtrace, errno, "Cannot start DAG stream");
467                return -1;
468        }
469        FORMAT_DATA->stream_attached = 1;
470        /* We don't want the dag card to do any sleeping */
471        dag_set_stream_poll(FORMAT_DATA->device->fd,
472                                FORMAT_DATA->dagstream, 0, &zero,
473                                &nopoll);
474
475        /* Should probably flush the memory hole now */
476
477        do {
478                top = dag_advance_stream(FORMAT_DATA->device->fd,
479                                        FORMAT_DATA->dagstream,
480                                        &bottom);
481                assert(top && bottom);
482                diff = top - bottom;
483                bottom -= diff;
484        } while (diff != 0);
485        FORMAT_DATA->top = NULL;
486        FORMAT_DATA->bottom = NULL;
487        FORMAT_DATA->processed = 0;
488        FORMAT_DATA->drops = 0;
489
490        return 0;
491}
492
493static int dag_pause_output(libtrace_out_t *libtrace) {
494        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
495                        FORMAT_DATA_OUT->dagstream) < 0) {
496                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
497                return -1;
498        }
499        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
500                        FORMAT_DATA_OUT->dagstream) < 0) {
501                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
502                return -1;
503        }
504        FORMAT_DATA_OUT->stream_attached = 0;
505        return 0;
506}
507
508static int dag_pause_input(libtrace_t *libtrace) {
509        if (dag_stop_stream(FORMAT_DATA->device->fd,
510                                FORMAT_DATA->dagstream) < 0) {
511                trace_set_err(libtrace, errno, "Could not stop DAG stream");
512                return -1;
513        }
514        if (dag_detach_stream(FORMAT_DATA->device->fd,
515                                FORMAT_DATA->dagstream) < 0) {
516                trace_set_err(libtrace, errno, "Could not detach DAG stream");
517                return -1;
518        }
519        FORMAT_DATA->stream_attached = 0;
520        return 0;
521}
522
523static int dag_fin_input(libtrace_t *libtrace) {
524        pthread_mutex_lock(&open_dag_mutex);
525        if (FORMAT_DATA->stream_attached)
526                dag_pause_input(libtrace);
527        FORMAT_DATA->device->ref_count --;
528
529        if (FORMAT_DATA->device->ref_count == 0)
530                dag_close_device(FORMAT_DATA->device);
531        if (DUCK.dummy_duck)
532                trace_destroy_dead(DUCK.dummy_duck);
533        free(libtrace->format_data);
534        pthread_mutex_unlock(&open_dag_mutex);
535        return 0; /* success */
536}
537
538static int dag_fin_output(libtrace_out_t *libtrace) {
539        // commit any outstanding traffic in the txbuffer
540        if (FORMAT_DATA_OUT->waiting) {
541                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
542                                FORMAT_DATA_OUT->waiting );
543        }
544
545        // wait until the buffer is nearly clear before exiting the program, as we
546        // will lose packets otherwise
547        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
548                        FORMAT_DATA_OUT->dagstream,
549                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
550                                        FORMAT_DATA_OUT->dagstream) - 8
551                        );
552
553        pthread_mutex_lock(&open_dag_mutex);
554        if (FORMAT_DATA_OUT->stream_attached)
555                dag_pause_output(libtrace);
556        FORMAT_DATA_OUT->device->ref_count --;
557
558        if (FORMAT_DATA_OUT->device->ref_count == 0)
559                dag_close_device(FORMAT_DATA_OUT->device);
560        free(libtrace->format_data);
561        pthread_mutex_unlock(&open_dag_mutex);
562        return 0; /* success */
563}
564
565static int dag_get_duckinfo(libtrace_t *libtrace,
566                                libtrace_packet_t *packet) {
567        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
568                        !packet->buffer) {
569                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
570                packet->buf_control = TRACE_CTRL_PACKET;
571                if (!packet->buffer) {
572                        trace_set_err(libtrace, errno,
573                                        "Cannot allocate packet buffer");
574                        return -1;
575                }
576        }
577
578        packet->header = 0;
579        packet->payload = packet->buffer;
580
581        /* No need to check if we can get DUCK or not - we're modern
582         * enough */
583        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
584                                        (duckinf_t *)packet->payload) < 0)) {
585                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
586                return -1;
587        }
588
589        packet->type = TRACE_RT_DUCK_2_5;
590        if (!DUCK.dummy_duck)
591                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
592        packet->trace = DUCK.dummy_duck;
593        return sizeof(duckinf_t);
594}
595
596static int dag_available(libtrace_t *libtrace) {
597        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
598        /* If we've processed more than 4MB of data since we last called
599         * dag_advance_stream, then we should call it again to allow the
600         * space occupied by that 4MB to be released */
601        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
602                return diff;
603        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
604                        FORMAT_DATA->dagstream,
605                        &(FORMAT_DATA->bottom));
606        if (FORMAT_DATA->top == NULL) {
607                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
608                return -1;
609        }
610        FORMAT_DATA->processed = 0;
611        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
612        return diff;
613}
614
615static dag_record_t *dag_get_record(libtrace_t *libtrace) {
616        dag_record_t *erfptr = NULL;
617        uint16_t size;
618        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
619        if (!erfptr)
620                return NULL;
621        size = ntohs(erfptr->rlen);
622        assert( size >= dag_record_size );
623        /* Make certain we have the full packet available */
624        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
625                return NULL;
626        FORMAT_DATA->bottom += size;
627        FORMAT_DATA->processed += size;
628        return erfptr;
629}
630
631static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
632                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
633
634        dag_record_t *erfptr;
635
636        if (packet->buffer != buffer &&
637                        packet->buf_control == TRACE_CTRL_PACKET) {
638                free(packet->buffer);
639        }
640
641        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
642                packet->buf_control = TRACE_CTRL_PACKET;
643        } else
644                packet->buf_control = TRACE_CTRL_EXTERNAL;
645
646        erfptr = (dag_record_t *)buffer;
647        packet->buffer = erfptr;
648        packet->header = erfptr;
649        packet->type = rt_type;
650
651        if (erfptr->flags.rxerror == 1) {
652                /* rxerror means the payload is corrupt - drop it
653                 * by tweaking rlen */
654                packet->payload = NULL;
655                erfptr->rlen = htons(erf_get_framing_length(packet));
656        } else {
657                packet->payload = (char*)packet->buffer
658                        + erf_get_framing_length(packet);
659        }
660
661        if (libtrace->format_data == NULL) {
662                dag_init_format_data(libtrace);
663        }
664
665        /* No loss counter for DSM coloured records - have to use
666         * some other API */
667        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
668
669        } else {
670                DATA(libtrace)->drops += ntohs(erfptr->lctr);
671        }
672
673        return 0;
674}
675
676/*
677 * dag_write_packet() at this stage attempts to improve tx performance
678 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
679 * has been met. I observed approximately 270% performance increase
680 * through this relatively naive tweak. No optimisation of buffer sizes
681 * was attempted.
682 */
683
684static int dag_dump_packet(libtrace_out_t *libtrace,
685                dag_record_t *erfptr, unsigned int pad, void *buffer) {
686        //int numbytes = 0;
687        int size;
688
689        /*
690         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
691         * requested any space yet, and request some, storing the pointer at
692         * FORMAT_DATA_OUT->txbuffer.
693         *
694         * The amount to request is slightly magical at the moment - it's
695         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
696         * the buffer and handle overruns.
697         */
698        if (FORMAT_DATA_OUT->waiting == 0) {
699                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
700                                FORMAT_DATA_OUT->dagstream, 16908288);
701        }
702
703        /*
704         * Copy the header separately to the body, as we can't guarantee they are
705         * in contiguous memory
706         */
707        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
708        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
709
710
711
712        /*
713         * Copy our incoming packet into the outgoing buffer, and increment our waiting count
714         */
715        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
716        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
717        FORMAT_DATA_OUT->waiting += size;
718
719        /*
720         * if our output buffer has more than 16 Mebibytes in it, commit those bytes and
721         * reset the waiting count to 0.
722         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in case
723         * there is still data in the buffer at program exit.
724         */
725
726        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
727                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
728                        FORMAT_DATA_OUT->waiting );
729                FORMAT_DATA_OUT->waiting = 0;
730        }
731
732        return size + pad + dag_record_size;
733
734}
735
736static bool find_compatible_linktype(libtrace_out_t *libtrace,
737                                libtrace_packet_t *packet)
738{
739         // Keep trying to simplify the packet until we can find
740         //something we can do with it
741
742        do {
743                char type=libtrace_to_erf_type(trace_get_link_type(packet));
744
745                // Success
746                if (type != (char)-1)
747                        return true;
748
749                if (!demote_packet(packet)) {
750                        trace_set_err_out(libtrace,
751                                        TRACE_ERR_NO_CONVERSION,
752                                        "No erf type for packet (%i)",
753                                        trace_get_link_type(packet));
754                        return false;
755                }
756
757        } while(1);
758
759        return true;
760}
761
762static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
763        /*
764         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding sucks,
765         * sorry about that.
766         */
767        unsigned int pad = 0;
768        int numbytes;
769        void *payload = packet->payload;
770        dag_record_t *header = (dag_record_t *)packet->header;
771
772        FORMAT_DATA_OUT->processed ++;
773        if(!packet->header) {
774                // No header, probably an RT packet. Lifted from erf_write_packet().
775                return -1;
776        }
777
778        pad = dag_get_padding(packet);
779
780        /*
781         * if the payload is null, adjust the rlen. Discussion of this is
782         * attached to erf_write_packet()
783         */
784        if (payload == NULL) {
785                header->rlen = htons(dag_record_size + pad);
786        }
787
788        if (packet->type == TRACE_RT_DATA_ERF) {
789                numbytes = dag_dump_packet(libtrace,
790                                header,
791                                pad,
792                                payload
793                                );
794
795        } else {
796                /* Build up a new packet header from the existing header */
797
798                // Simplify the packet first - if we can't do this, break early
799                if (!find_compatible_linktype(libtrace,packet))
800                        return -1;
801
802                dag_record_t erfhdr;
803
804                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
805                payload=packet->payload;
806                pad = dag_get_padding(packet);
807
808                /* Flags. Can't do this */
809                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
810                if (trace_get_direction(packet)!=~0U)
811                        erfhdr.flags.iface = trace_get_direction(packet);
812
813                erfhdr.type = libtrace_to_erf_type(trace_get_link_type(packet));
814
815                /* Packet length (rlen includes format overhead) */
816                assert(trace_get_capture_length(packet)>0
817                                && trace_get_capture_length(packet)<=65536);
818                assert(erf_get_framing_length(packet)>0
819                                && trace_get_framing_length(packet)<=65536);
820                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
821                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
822
823                erfhdr.rlen = htons(trace_get_capture_length(packet)
824                        + erf_get_framing_length(packet));
825
826
827                /* loss counter. Can't do this */
828                erfhdr.lctr = 0;
829                /* Wire length, does not include padding! */
830                erfhdr.wlen = htons(trace_get_wire_length(packet));
831
832                /* Write it out */
833                numbytes = dag_dump_packet(libtrace,
834                                &erfhdr,
835                                pad,
836                                payload);
837
838        }
839
840        return numbytes;
841}
842
843static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
844        int size = 0;
845        struct timeval tv;
846        dag_record_t *erfptr = NULL;
847        int numbytes = 0;
848        uint32_t flags = 0;
849
850        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
851                        DUCK.duck_freq != 0) {
852                size = dag_get_duckinfo(libtrace, packet);
853                DUCK.last_duck = DUCK.last_pkt;
854                if (size != 0) {
855                        return size;
856                }
857                /* No DUCK support, so don't waste our time anymore */
858                DUCK.duck_freq = 0;
859        }
860
861        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
862
863        if (packet->buf_control == TRACE_CTRL_PACKET) {
864                free(packet->buffer);
865                packet->buffer = 0;
866        }
867
868        do {
869                numbytes = dag_available(libtrace);
870                if (numbytes < 0)
871                        return numbytes;
872                if (numbytes < dag_record_size)
873                        /* Block until we see a packet */
874                        continue;
875                erfptr = dag_get_record(libtrace);
876        } while (erfptr == NULL);
877
878        //dag_form_packet(erfptr, packet);
879        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
880                                flags))
881                return -1;
882        tv = trace_get_timeval(packet);
883        DUCK.last_pkt = tv.tv_sec;
884
885        return packet->payload ? htons(erfptr->rlen) :
886                                erf_get_framing_length(packet);
887}
888
889static libtrace_eventobj_t trace_event_dag(libtrace_t *trace,
890                                        libtrace_packet_t *packet) {
891        libtrace_eventobj_t event = {0,0,0.0,0};
892        dag_record_t *erfptr = NULL;
893        int numbytes;
894        uint32_t flags = 0;
895
896        do {
897                erfptr = NULL;
898                numbytes = 0;
899       
900                /* Need to call dag_available so that the top pointer will get
901                 * updated, otherwise we'll never see any data! */
902                numbytes = dag_available(trace);
903
904                /* May as well not bother calling dag_get_record if
905                 * dag_available suggests that there's no data */
906                if (numbytes != 0)
907                        erfptr = dag_get_record(trace);
908                if (erfptr == NULL) {
909                        /* No packet available */
910                        event.type = TRACE_EVENT_SLEEP;
911                        event.seconds = 0.0001;
912                        break;
913                }
914                //dag_form_packet(erfptr, packet);
915                if (dag_prepare_packet(trace, packet, erfptr, 
916                                        TRACE_RT_DATA_ERF, flags)) {
917                        event.type = TRACE_EVENT_TERMINATE;
918                        break;
919                }
920
921
922                event.size = trace_get_capture_length(packet) + 
923                                trace_get_framing_length(packet);
924                if (trace->filter) {
925                        if (trace_apply_filter(trace->filter, packet)) {
926                                event.type = TRACE_EVENT_PACKET;
927                        } else {
928                                /* This packet isn't useful so we want to
929                                 * immediately see if there is another suitable
930                                 * one - we definitely DO NOT want to return
931                                 * a sleep event in this case, like we used to
932                                 * do! */
933                                continue;
934                        }
935                } else {
936                        event.type = TRACE_EVENT_PACKET;
937                }
938
939                if (trace->snaplen > 0) {
940                        trace_set_capture_length(packet, trace->snaplen);
941                }
942                break;
943        } while (1);
944
945        return event;
946}
947
948static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
949        if (trace->format_data == NULL)
950                return (uint64_t)-1;
951        return DATA(trace)->drops;
952}
953
954static void dag_help(void) {
955        printf("dag format module: $Revision$\n");
956        printf("Supported input URIs:\n");
957        printf("\tdag:/dev/dagn\n");
958        printf("\n");
959        printf("\te.g.: dag:/dev/dag0\n");
960        printf("\n");
961        printf("Supported output URIs:\n");
962        printf("\tnone\n");
963        printf("\n");
964}
965
966static struct libtrace_format_t dag = {
967        "dag",
968        "$Id$",
969        TRACE_FORMAT_ERF,
970        dag_probe_filename,             /* probe filename */
971        NULL,                           /* probe magic */
972        dag_init_input,                 /* init_input */
973        dag_config_input,               /* config_input */
974        dag_start_input,                /* start_input */
975        dag_pause_input,                /* pause_input */
976        dag_init_output,                /* init_output */
977        NULL,                           /* config_output */
978        dag_start_output,               /* start_output */
979        dag_fin_input,                  /* fin_input */
980        dag_fin_output,                 /* fin_output */
981        dag_read_packet,                /* read_packet */
982        dag_prepare_packet,             /* prepare_packet */
983        NULL,                           /* fin_packet */
984        dag_write_packet,               /* write_packet */
985        erf_get_link_type,              /* get_link_type */
986        erf_get_direction,              /* get_direction */
987        erf_set_direction,              /* set_direction */
988        erf_get_erf_timestamp,          /* get_erf_timestamp */
989        NULL,                           /* get_timeval */
990        NULL,                           /* get_seconds */
991        NULL,                           /* get_timespec */
992        NULL,                           /* seek_erf */
993        NULL,                           /* seek_timeval */
994        NULL,                           /* seek_seconds */
995        erf_get_capture_length,         /* get_capture_length */
996        erf_get_wire_length,            /* get_wire_length */
997        erf_get_framing_length,         /* get_framing_length */
998        erf_set_capture_length,         /* set_capture_length */
999        NULL,                           /* get_received_packets */
1000        NULL,                           /* get_filtered_packets */
1001        dag_get_dropped_packets,        /* get_dropped_packets */
1002        NULL,                           /* get_captured_packets */
1003        NULL,                           /* get_fd */
1004        trace_event_dag,                /* trace_event */
1005        dag_help,                       /* help */
1006        NULL                            /* next pointer */
1007};
1008
1009void dag_constructor(void) {
1010        register_format(&dag);
1011}
Note: See TracBrowser for help on using the repository browser.