source: lib/format_dag25.c @ f0fb38f

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since f0fb38f was f0fb38f, checked in by Shane Alcock <salcock@…>, 14 years ago
  • Added prepare_packet functions to all formats, primarily to support translating RT packets into the appropriate format. These functions are all used internally as well, as most formats still need to "prepare" packets that have been read by setting pointers, updating loss counters etc.
  • Also added a trace_prepare_packet function, but this is not made available externally at this stage
  • Added init_format_data functions to some formats to initialise format data structures in cases where the init_trace function does more than just that
  • Refactored rt packet reading code to use the new trace_prepare_packet functionality - also did a lot of tidying of the code
  • Added missing RT type for BPF format
  • Property mode set to 100644
File size: 17.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31#define _GNU_SOURCE
32
33#include "config.h"
34#include "common.h"
35#include "libtrace.h"
36#include "libtrace_int.h"
37#include "format_helper.h"
38#include "format_erf.h"
39
40#include <assert.h>
41#include <errno.h>
42#include <fcntl.h>
43#include <stdio.h>
44#include <string.h>
45#include <stdlib.h>
46
47#include <sys/mman.h>
48/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
49 * Windows anyway so we'll worry about this more later :] */
50#include <pthread.h>
51
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66
67#define DATA(x) ((struct dag_format_data_t *)x->format_data)
68#define FORMAT_DATA DATA(libtrace)
69#define DUCK FORMAT_DATA->duck
70static struct libtrace_format_t dag;
71
72struct dag_dev_t {
73        //pthread_mutex_t dag_mutex;
74        char * dev_name;
75        int fd;
76        uint16_t ref_count;
77        struct dag_dev_t *prev;
78        struct dag_dev_t *next;
79};     
80
81struct dag_format_data_t {
82        struct {
83                uint32_t last_duck;
84                uint32_t duck_freq;
85                uint32_t last_pkt;
86                libtrace_t *dummy_duck;
87        } duck;
88
89        struct dag_dev_t *device;
90        unsigned int dagstream;
91        int stream_attached;
92        uint8_t *bottom;
93        uint8_t *top;
94        uint32_t processed;
95        uint64_t drops;
96};
97
98pthread_mutex_t open_dag_mutex;
99struct dag_dev_t *open_dags = NULL;
100
101static void dag_init_format_data(libtrace_t *libtrace) {
102        libtrace->format_data = (struct dag_format_data_t *)
103                malloc(sizeof(struct dag_format_data_t));
104        DUCK.last_duck = 0;
105        DUCK.duck_freq = 0;
106        DUCK.last_pkt = 0;
107        DUCK.dummy_duck = NULL;
108        FORMAT_DATA->stream_attached = 0;
109        FORMAT_DATA->drops = 0;
110        FORMAT_DATA->device = NULL;
111        FORMAT_DATA->dagstream = 0;
112        FORMAT_DATA->processed = 0;
113        FORMAT_DATA->bottom = NULL;
114        FORMAT_DATA->top = NULL;
115}
116
117/* NOTE: This function assumes the open_dag_mutex is held by the caller */
118static struct dag_dev_t *dag_find_open_device(char *dev_name) {
119        struct dag_dev_t *dag_dev;
120       
121        dag_dev = open_dags;
122
123        /* XXX: Not exactly zippy, but how often are we going to be dealing
124         * with multiple dag cards? */
125        while (dag_dev != NULL) {
126                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
127                        dag_dev->ref_count ++;
128                        return dag_dev;
129                       
130                }
131                dag_dev = dag_dev->next;
132        }
133        return NULL;
134               
135       
136}
137
138/* NOTE: This function assumes the open_dag_mutex is held by the caller */
139static void dag_close_device(struct dag_dev_t *dev) {
140        /* Need to remove from the device list */
141       
142        assert(dev->ref_count == 0);
143       
144        if (dev->prev == NULL) {
145                open_dags = dev->next;
146                if (dev->next)
147                        dev->next->prev = NULL;
148        } else {
149                dev->prev->next = dev->next;
150                if (dev->next)
151                        dev->next->prev = dev->prev;
152        }
153
154        dag_close(dev->fd);
155        free(dev->dev_name);
156        free(dev);
157               
158}
159
160/* NOTE: This function assumes the open_dag_mutex is held by the caller */
161static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
162        struct stat buf;
163        int fd;
164        struct dag_dev_t *new_dev;
165       
166        if (stat(dev_name, &buf) == -1) {
167                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
168                return NULL;
169        }
170       
171        if (S_ISCHR(buf.st_mode)) {
172                if((fd = dag_open(dev_name)) < 0) {
173                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
174                                        dev_name);
175                        return NULL;
176                }
177        } else {
178                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
179                                dev_name);
180                return NULL;
181        }
182       
183        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
184        new_dev->fd = fd;
185        new_dev->dev_name = dev_name;
186        new_dev->ref_count = 1;
187       
188        new_dev->prev = NULL;
189        new_dev->next = open_dags;
190        if (open_dags)
191                open_dags->prev = new_dev;
192       
193        open_dags = new_dev;
194       
195        return new_dev;
196}
197       
198
199static int dag_init_input(libtrace_t *libtrace) {
200        char *dag_dev_name = NULL;
201        char *scan = NULL;
202        int stream = 0;
203        struct dag_dev_t *dag_device = NULL;
204       
205        dag_init_format_data(libtrace);
206        pthread_mutex_lock(&open_dag_mutex);
207        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
208                dag_dev_name = strdup(libtrace->uridata);
209        } else {
210                dag_dev_name = (char *)strndup(libtrace->uridata,
211                                (size_t)(scan - libtrace->uridata));
212                stream = atoi(++scan);
213        }
214       
215       
216
217        /* For now, we don't offer the ability to select the stream */
218        FORMAT_DATA->dagstream = stream;
219
220        dag_device = dag_find_open_device(dag_dev_name);
221
222        if (dag_device == NULL) {
223                /* Device not yet opened - open it ourselves */
224                dag_device = dag_open_device(libtrace, dag_dev_name);
225        } else {
226                free(dag_dev_name);
227                dag_dev_name = NULL;
228        }
229
230        if (dag_device == NULL) {
231                if (dag_dev_name)
232                        free(dag_dev_name);
233                return -1;
234        }
235
236        FORMAT_DATA->device = dag_device;
237       
238        pthread_mutex_unlock(&open_dag_mutex);
239        return 0;
240}
241       
242static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
243                                void *data) {
244        char conf_str[4096];
245        switch(option) {
246                case TRACE_OPTION_META_FREQ:
247                        DUCK.duck_freq = *(int *)data;
248                        return 0;
249                case TRACE_OPTION_SNAPLEN:
250                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
251                        if (dag_configure(FORMAT_DATA->device->fd, 
252                                                conf_str) != 0) {
253                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
254                                return -1;
255                        }
256                        return 0;
257                case TRACE_OPTION_PROMISC:
258                        /* DAG already operates in a promisc fashion */
259                        return -1;
260                case TRACE_OPTION_FILTER:
261                        return -1;
262                case TRACE_OPTION_EVENT_REALTIME:
263                        return -1;
264        }
265        return -1;
266}
267
268static int dag_start_input(libtrace_t *libtrace) {
269        struct timeval zero, nopoll;
270        uint8_t *top, *bottom;
271        uint8_t diff = 0;
272        top = bottom = NULL;
273
274        zero.tv_sec = 0;
275        zero.tv_usec = 0;
276        nopoll = zero;
277
278
279       
280        if (dag_attach_stream(FORMAT_DATA->device->fd, 
281                                FORMAT_DATA->dagstream, 0, 0) < 0) {
282                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
283                return -1;
284        }
285
286        if (dag_start_stream(FORMAT_DATA->device->fd, 
287                                FORMAT_DATA->dagstream) < 0) {
288                trace_set_err(libtrace, errno, "Cannot start DAG stream");
289                return -1;
290        }
291        FORMAT_DATA->stream_attached = 1;
292        /* We don't want the dag card to do any sleeping */
293        dag_set_stream_poll(FORMAT_DATA->device->fd, 
294                                FORMAT_DATA->dagstream, 0, &zero, 
295                                &nopoll);
296       
297        /* Should probably flush the memory hole now */
298       
299        do {
300                top = dag_advance_stream(FORMAT_DATA->device->fd,
301                                        FORMAT_DATA->dagstream,
302                                        &bottom);
303                assert(top && bottom);
304                diff = top - bottom;
305                bottom -= diff;
306        } while (diff != 0);
307        FORMAT_DATA->top = NULL;
308        FORMAT_DATA->bottom = NULL;
309        FORMAT_DATA->processed = 0;
310        FORMAT_DATA->drops = 0;
311       
312        return 0;
313}
314
315static int dag_pause_input(libtrace_t *libtrace) {
316        if (dag_stop_stream(FORMAT_DATA->device->fd, 
317                                FORMAT_DATA->dagstream) < 0) {
318                trace_set_err(libtrace, errno, "Could not stop DAG stream");
319                return -1;
320        }
321        if (dag_detach_stream(FORMAT_DATA->device->fd, 
322                                FORMAT_DATA->dagstream) < 0) {
323                trace_set_err(libtrace, errno, "Could not detach DAG stream");
324                return -1;
325        }
326        FORMAT_DATA->stream_attached = 0;
327        return 0;
328}
329
330static int dag_fin_input(libtrace_t *libtrace) {
331        pthread_mutex_lock(&open_dag_mutex);
332        if (FORMAT_DATA->stream_attached)
333                dag_pause_input(libtrace);
334        FORMAT_DATA->device->ref_count --;
335       
336        if (FORMAT_DATA->device->ref_count == 0)
337                dag_close_device(FORMAT_DATA->device);
338        if (DUCK.dummy_duck)
339                trace_destroy_dead(DUCK.dummy_duck);
340        free(libtrace->format_data);
341        pthread_mutex_unlock(&open_dag_mutex);
342        return 0; /* success */
343}
344
345static int dag_get_duckinfo(libtrace_t *libtrace,
346                                libtrace_packet_t *packet) {
347        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
348                        !packet->buffer) {
349                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
350                packet->buf_control = TRACE_CTRL_PACKET;
351                if (!packet->buffer) {
352                        trace_set_err(libtrace, errno,
353                                        "Cannot allocate packet buffer");
354                        return -1;
355                }
356        }
357
358        packet->header = 0;
359        packet->payload = packet->buffer;
360
361        /* No need to check if we can get DUCK or not - we're modern
362         * enough */
363        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK, 
364                                        (duckinf_t *)packet->payload) < 0)) {
365                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
366                return -1;
367        }
368
369        packet->type = TRACE_RT_DUCK_2_5;
370        if (!DUCK.dummy_duck)
371                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
372        packet->trace = DUCK.dummy_duck;
373        return sizeof(duckinf_t);
374}
375
376static int dag_available(libtrace_t *libtrace) {
377        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
378        /* If we've processed more than 4MB of data since we last called
379         * dag_advance_stream, then we should call it again to allow the
380         * space occupied by that 4MB to be released */
381        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
382                return diff;
383        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd, 
384                        FORMAT_DATA->dagstream, 
385                        &(FORMAT_DATA->bottom));
386        if (FORMAT_DATA->top == NULL) {
387                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
388                return -1;
389        }
390        FORMAT_DATA->processed = 0;
391        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
392        return diff;
393}
394
395static dag_record_t *dag_get_record(libtrace_t *libtrace) {
396        dag_record_t *erfptr = NULL;
397        uint16_t size;
398        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
399        if (!erfptr)
400                return NULL;
401        size = ntohs(erfptr->rlen);
402        assert( size >= dag_record_size );
403        /* Make certain we have the full packet available */
404        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
405                return NULL;
406        FORMAT_DATA->bottom += size;
407        FORMAT_DATA->processed += size;
408        return erfptr;
409}
410
411static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
412                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
413       
414        dag_record_t *erfptr;
415       
416        if (packet->buffer != buffer && 
417                        packet->buf_control == TRACE_CTRL_PACKET) {
418                free(packet->buffer);
419        }
420       
421        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
422                packet->buf_control = TRACE_CTRL_PACKET;
423        } else
424                packet->buf_control = TRACE_CTRL_EXTERNAL;
425       
426        erfptr = (dag_record_t *)packet->buffer;
427        packet->buffer = erfptr;
428        packet->header = erfptr;
429        packet->type = rt_type;
430       
431        if (erfptr->flags.rxerror == 1) {
432                /* rxerror means the payload is corrupt - drop it
433                 * by tweaking rlen */
434                packet->payload = NULL;
435                erfptr->rlen = htons(erf_get_framing_length(packet));
436        } else {
437                packet->payload = (char*)packet->buffer
438                        + erf_get_framing_length(packet);
439        }
440       
441        if (libtrace->format_data == NULL) {
442                dag_init_format_data(libtrace); 
443        }
444       
445        /* No loss counter for DSM coloured records - have to use
446         * some other API */
447        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
448               
449        } else {
450                DATA(libtrace)->drops += ntohs(erfptr->lctr);
451        }
452
453        return 0;
454}
455
456
457static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
458        int size = 0;
459        struct timeval tv;
460        dag_record_t *erfptr = NULL;
461        int numbytes = 0;
462        uint32_t flags = 0;
463       
464        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
465                        DUCK.duck_freq != 0) {
466                size = dag_get_duckinfo(libtrace, packet);
467                DUCK.last_duck = DUCK.last_pkt;
468                if (size != 0) {
469                        return size;
470                }
471                /* No DUCK support, so don't waste our time anymore */
472                DUCK.duck_freq = 0;
473        }
474
475        if (packet->buf_control == TRACE_CTRL_PACKET) {
476                free(packet->buffer);
477                packet->buffer = 0;
478        }
479
480        do {
481                numbytes = dag_available(libtrace);
482                if (numbytes < 0)
483                        return numbytes;
484                if (numbytes < dag_record_size)
485                        /* Block until we see a packet */
486                        continue;
487                erfptr = dag_get_record(libtrace);
488        } while (erfptr == NULL);
489
490        //dag_form_packet(erfptr, packet);
491        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
492                                flags))
493                return -1;
494        tv = trace_get_timeval(packet);
495        DUCK.last_pkt = tv.tv_sec;
496       
497        return packet->payload ? htons(erfptr->rlen) : 
498                                erf_get_framing_length(packet);
499}
500
501static libtrace_eventobj_t trace_event_dag(libtrace_t *trace,
502                                        libtrace_packet_t *packet) {
503        libtrace_eventobj_t event = {0,0,0.0,0};
504        dag_record_t *erfptr = NULL;
505        int numbytes;
506        uint32_t flags = 0;
507       
508        /* Need to call dag_available so that the top pointer will get
509         * updated, otherwise we'll never see any data! */
510        numbytes = dag_available(trace);
511
512        /* May as well not bother calling dag_get_record if dag_available
513         * suggests that there's no data */
514        if (numbytes != 0)
515                erfptr = dag_get_record(trace);
516        if (erfptr == NULL) {
517                /* No packet available */
518                event.type = TRACE_EVENT_SLEEP;
519                event.seconds = 0.0001;
520                return event;
521        }
522        //dag_form_packet(erfptr, packet);
523        if (dag_prepare_packet(trace, packet, erfptr, TRACE_RT_DATA_ERF, 
524                                flags)) {
525                event.type = TRACE_EVENT_TERMINATE;
526                return event;
527        }
528               
529               
530        event.size = trace_get_capture_length(packet) + trace_get_framing_length(packet);
531        if (trace->filter) {
532                if (trace_apply_filter(trace->filter, packet)) {
533                        event.type = TRACE_EVENT_PACKET;
534                } else {
535                        event.type = TRACE_EVENT_SLEEP;
536                        event.seconds = 0.000001;
537                        return event;
538                }
539        } else {
540                event.type = TRACE_EVENT_PACKET;
541        }
542
543        if (trace->snaplen > 0) {
544                trace_set_capture_length(packet, trace->snaplen);
545        }
546
547        return event;
548}
549
550static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
551        if (trace->format_data == NULL)
552                return (uint64_t)-1;
553        return DATA(trace)->drops;
554}
555
556static void dag_help(void) {
557        printf("dag format module: $Revision$\n");
558        printf("Supported input URIs:\n");
559        printf("\tdag:/dev/dagn\n");
560        printf("\n");
561        printf("\te.g.: dag:/dev/dag0\n");
562        printf("\n");
563        printf("Supported output URIs:\n");
564        printf("\tnone\n");
565        printf("\n");
566}
567
568static struct libtrace_format_t dag = {
569        "dag",
570        "$Id$",
571        TRACE_FORMAT_ERF,
572        dag_init_input,                 /* init_input */
573        dag_config_input,               /* config_input */
574        dag_start_input,                /* start_input */
575        dag_pause_input,                /* pause_input */
576        NULL,                           /* init_output */
577        NULL,                           /* config_output */
578        NULL,                           /* start_output */
579        dag_fin_input,                  /* fin_input */
580        NULL,                           /* fin_output */
581        dag_read_packet,                /* read_packet */
582        dag_prepare_packet,             /* prepare_packet */
583        NULL,                           /* fin_packet */
584        NULL,                           /* write_packet */
585        erf_get_link_type,              /* get_link_type */
586        erf_get_direction,              /* get_direction */
587        erf_set_direction,              /* set_direction */
588        erf_get_erf_timestamp,          /* get_erf_timestamp */
589        NULL,                           /* get_timeval */
590        NULL,                           /* get_seconds */
591        NULL,                           /* seek_erf */
592        NULL,                           /* seek_timeval */
593        NULL,                           /* seek_seconds */
594        erf_get_capture_length,         /* get_capture_length */
595        erf_get_wire_length,            /* get_wire_length */
596        erf_get_framing_length,         /* get_framing_length */
597        erf_set_capture_length,         /* set_capture_length */
598        NULL,                           /* get_received_packets */
599        NULL,                           /* get_filtered_packets */
600        dag_get_dropped_packets,        /* get_dropped_packets */
601        NULL,                           /* get_captured_packets */
602        NULL,                           /* get_fd */
603        trace_event_dag,                /* trace_event */
604        dag_help,                       /* help */
605        NULL                            /* next pointer */
606};
607
608void dag_constructor(void) {
609        register_format(&dag);
610}
Note: See TracBrowser for help on using the repository browser.