source: lib/format_dag25.c @ 121b7e2

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 121b7e2 was 121b7e2, checked in by Shane Alcock <salcock@…>, 14 years ago
  • Updated format_dag25 to support multiple streams - the new uri format for dag is "dag:/dev/dagX,<stream number>"
  • As each dag card can only be opened by a single process, thread-safety is now incorporated to allow separate threads to read from each dag stream
  • format_dag24 also supports the new uri format, although the stream number is ignored because old dags only have the one stream
  • Updated dagformat.h to include a whole bunch of new erf types
  • Added explicit support for the DSM Coloured Ethernet record type which will be required to deal with streamed packets
  • Fixed erf_get_padding() code that was comparing the return value of trace_get_link_type against an erf type rather than a libtrace link type
  • Property mode set to 100644
File size: 16.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31#define _GNU_SOURCE
32
33#include "config.h"
34#include "common.h"
35#include "libtrace.h"
36#include "libtrace_int.h"
37#include "format_helper.h"
38#include "format_erf.h"
39
40#include <assert.h>
41#include <errno.h>
42#include <fcntl.h>
43#include <stdio.h>
44#include <string.h>
45#include <stdlib.h>
46
47#include <sys/mman.h>
48
49
50#ifdef WIN32
51#  include <io.h>
52#  include <share.h>
53#  define PATH_MAX _MAX_PATH
54#  define snprintf sprintf_s
55#else
56#  include <netdb.h>
57#  ifndef PATH_MAX
58#       define PATH_MAX 4096
59#  endif
60#  include <sys/ioctl.h>
61#  include <pthread.h>
62#endif
63
64
65#define DATA(x) ((struct dag_format_data_t *)x->format_data)
66#define FORMAT_DATA DATA(libtrace)
67#define DUCK FORMAT_DATA->duck
68static struct libtrace_format_t dag;
69
70struct dag_dev_t {
71        //pthread_mutex_t dag_mutex;
72        char * dev_name;
73        int fd;
74        uint16_t ref_count;
75        struct dag_dev_t *next;
76};     
77
78struct dag_format_data_t {
79        struct {
80                uint32_t last_duck;
81                uint32_t duck_freq;
82                uint32_t last_pkt;
83                libtrace_t *dummy_duck;
84        } duck;
85
86        struct dag_dev_t *device;
87        unsigned int dagstream;
88        int stream_attached;
89        uint8_t *bottom;
90        uint8_t *top;
91        uint32_t processed;
92        uint64_t drops;
93};
94
95pthread_mutex_t open_dag_mutex;
96struct dag_dev_t *open_dags = NULL;
97
98static struct dag_dev_t *dag_find_open_device(char *dev_name) {
99        struct dag_dev_t *dag_dev;
100       
101        pthread_mutex_lock(&open_dag_mutex);
102        dag_dev = open_dags;
103
104        /* XXX: Not exactly zippy, but how often are we going to be dealing
105         * with multiple dag cards? */
106        while (dag_dev != NULL) {
107                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
108                        pthread_mutex_unlock(&open_dag_mutex);
109                        return dag_dev;
110                       
111                }
112                dag_dev = dag_dev->next;
113        }
114        pthread_mutex_unlock(&open_dag_mutex);
115        return NULL;
116               
117       
118}
119
120static void dag_close_device(struct dag_dev_t *dev) {
121        /* Need to remove from the device list */
122        struct dag_dev_t *prev, *d;
123
124        prev = NULL;
125        pthread_mutex_lock(&open_dag_mutex);
126
127        d = open_dags;
128
129        while (d != NULL) {
130                if (strcmp(dev->dev_name, d->dev_name) == 0) {
131                        /* Found it! */
132                        if (prev == NULL) {
133                                open_dags = d->next;
134                        } else {
135                                prev->next = d->next;
136                        }
137                        assert(d->ref_count == 0);
138                        dag_close(d->fd);
139                        free(d->dev_name);
140                        free(d);
141                        return;
142                }
143                prev = d;
144                d = d->next;
145        }
146
147        /* Not sure what we do here - we've been asked to close a
148         * device that isn't in our linked list - probably safest to
149         * just return. Is there anything else we can really do? */
150        return;
151}
152
153static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
154        struct stat buf;
155        int fd;
156        struct dag_dev_t *new_dev;
157       
158        if (stat(dev_name, &buf) == -1) {
159                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
160                return NULL;
161        }
162       
163        if (S_ISCHR(buf.st_mode)) {
164                if((fd = dag_open(dev_name)) < 0) {
165                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
166                                        dev_name);
167                        return NULL;
168                }
169        } else {
170                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
171                                dev_name);
172                return NULL;
173        }
174       
175        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
176        new_dev->fd = fd;
177        new_dev->dev_name = dev_name;
178        new_dev->ref_count = 0;
179       
180        pthread_mutex_lock(&open_dag_mutex);
181        new_dev->next = open_dags;
182        open_dags = new_dev;
183        pthread_mutex_unlock(&open_dag_mutex);
184       
185        return new_dev;
186}
187       
188
189static int dag_init_input(libtrace_t *libtrace) {
190        char *dag_dev_name = NULL;
191        char *scan = NULL;
192        int stream = 0;
193        struct dag_dev_t *dag_device = NULL;
194       
195        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
196                dag_dev_name = strdup(libtrace->uridata);
197        } else {
198                dag_dev_name = (char *)strndup(libtrace->uridata,
199                                (size_t)(scan - libtrace->uridata));
200                stream = atoi(++scan);
201        }
202       
203       
204        libtrace->format_data = (struct dag_format_data_t *)
205                malloc(sizeof(struct dag_format_data_t));
206
207        /* For now, we don't offer the ability to select the stream */
208        FORMAT_DATA->dagstream = stream;
209
210        dag_device = dag_find_open_device(dag_dev_name);
211
212        if (dag_device == NULL) {
213                /* Device not yet opened - open it ourselves */
214                dag_device = dag_open_device(libtrace, dag_dev_name);
215        } else {
216                free(dag_dev_name);
217                dag_dev_name = NULL;
218        }
219
220        if (dag_device == NULL) {
221                if (dag_dev_name)
222                        free(dag_dev_name);
223                return -1;
224        }
225
226
227        dag_device->ref_count ++;
228
229        DUCK.last_duck = 0;
230        DUCK.duck_freq = 0;
231        DUCK.last_pkt = 0;
232        DUCK.dummy_duck = NULL;
233        FORMAT_DATA->device = dag_device;
234        FORMAT_DATA->stream_attached = 0;
235        FORMAT_DATA->drops = 0;
236       
237        return 0;
238}
239       
240static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
241                                void *data) {
242        char conf_str[4096];
243        switch(option) {
244                case TRACE_OPTION_META_FREQ:
245                        DUCK.duck_freq = *(int *)data;
246                        return 0;
247                case TRACE_OPTION_SNAPLEN:
248                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
249                        if (dag_configure(FORMAT_DATA->device->fd, 
250                                                conf_str) != 0) {
251                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
252                                return -1;
253                        }
254                        return 0;
255                case TRACE_OPTION_PROMISC:
256                        /* DAG already operates in a promisc fashion */
257                        return -1;
258                case TRACE_OPTION_FILTER:
259                        return -1;
260                case TRACE_OPTION_EVENT_REALTIME:
261                        return -1;
262        }
263        return -1;
264}
265
266static int dag_start_input(libtrace_t *libtrace) {
267        struct timeval zero, nopoll;
268        uint8_t *top, *bottom;
269        uint8_t diff = 0;
270        top = bottom = NULL;
271
272        zero.tv_sec = 0;
273        zero.tv_usec = 0;
274        nopoll = zero;
275
276
277       
278        if (dag_attach_stream(FORMAT_DATA->device->fd, 
279                                FORMAT_DATA->dagstream, 0, 0) < 0) {
280                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
281                return -1;
282        }
283
284        if (dag_start_stream(FORMAT_DATA->device->fd, 
285                                FORMAT_DATA->dagstream) < 0) {
286                trace_set_err(libtrace, errno, "Cannot start DAG stream");
287                return -1;
288        }
289        FORMAT_DATA->stream_attached = 1;
290        /* We don't want the dag card to do any sleeping */
291        dag_set_stream_poll(FORMAT_DATA->device->fd, 
292                                FORMAT_DATA->dagstream, 0, &zero, 
293                                &nopoll);
294       
295        /* Should probably flush the memory hole now */
296       
297        do {
298                top = dag_advance_stream(FORMAT_DATA->device->fd,
299                                        FORMAT_DATA->dagstream,
300                                        &bottom);
301                assert(top && bottom);
302                diff = top - bottom;
303                bottom -= diff;
304        } while (diff != 0);
305        FORMAT_DATA->top = NULL;
306        FORMAT_DATA->bottom = NULL;
307        FORMAT_DATA->processed = 0;
308        FORMAT_DATA->drops = 0;
309       
310        return 0;
311}
312
313static int dag_pause_input(libtrace_t *libtrace) {
314        if (dag_stop_stream(FORMAT_DATA->device->fd, 
315                                FORMAT_DATA->dagstream) < 0) {
316                trace_set_err(libtrace, errno, "Could not stop DAG stream");
317                return -1;
318        }
319        if (dag_detach_stream(FORMAT_DATA->device->fd, 
320                                FORMAT_DATA->dagstream) < 0) {
321                trace_set_err(libtrace, errno, "Could not detach DAG stream");
322                return -1;
323        }
324        FORMAT_DATA->stream_attached = 0;
325        return 0;
326}
327
328static int dag_fin_input(libtrace_t *libtrace) {
329        if (FORMAT_DATA->stream_attached)
330                dag_pause_input(libtrace);
331        FORMAT_DATA->device->ref_count --;
332       
333        if (FORMAT_DATA->device->ref_count == 0)
334                dag_close_device(FORMAT_DATA->device);
335        if (DUCK.dummy_duck)
336                trace_destroy_dead(DUCK.dummy_duck);
337        free(libtrace->format_data);
338        return 0; /* success */
339}
340
341static int dag_get_duckinfo(libtrace_t *libtrace,
342                                libtrace_packet_t *packet) {
343        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
344                        !packet->buffer) {
345                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
346                packet->buf_control = TRACE_CTRL_PACKET;
347                if (!packet->buffer) {
348                        trace_set_err(libtrace, errno,
349                                        "Cannot allocate packet buffer");
350                        return -1;
351                }
352        }
353
354        packet->header = 0;
355        packet->payload = packet->buffer;
356
357        /* No need to check if we can get DUCK or not - we're modern
358         * enough */
359        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK, 
360                                        (duckinf_t *)packet->payload) < 0)) {
361                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
362                return -1;
363        }
364
365        packet->type = TRACE_RT_DUCK_2_5;
366        if (!DUCK.dummy_duck)
367                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
368        packet->trace = DUCK.dummy_duck;
369        return sizeof(duckinf_t);
370}
371
372static int dag_available(libtrace_t *libtrace) {
373        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
374        /* If we've processed more than 4MB of data since we last called
375         * dag_advance_stream, then we should call it again to allow the
376         * space occupied by that 4MB to be released */
377        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
378                return diff;
379        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd, 
380                        FORMAT_DATA->dagstream, 
381                        &(FORMAT_DATA->bottom));
382        if (FORMAT_DATA->top == NULL) {
383                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
384                return -1;
385        }
386        FORMAT_DATA->processed = 0;
387        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
388        return diff;
389}
390
391static dag_record_t *dag_get_record(libtrace_t *libtrace) {
392        dag_record_t *erfptr = NULL;
393        uint16_t size;
394        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
395        if (!erfptr)
396                return NULL;
397        size = ntohs(erfptr->rlen);
398        assert( size >= dag_record_size );
399        /* Make certain we have the full packet available */
400        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
401                return NULL;
402        FORMAT_DATA->bottom += size;
403        FORMAT_DATA->processed += size;
404        return erfptr;
405}
406
407static void dag_form_packet(dag_record_t *erfptr, libtrace_packet_t *packet) {
408        packet->buffer = erfptr;
409        packet->header = erfptr;
410        packet->type = TRACE_RT_DATA_ERF;
411        if (erfptr->flags.rxerror == 1) {
412                /* rxerror means the payload is corrupt - drop it
413                 * by tweaking rlen */
414                packet->payload = NULL;
415                erfptr->rlen = htons(erf_get_framing_length(packet));
416        } else {
417                packet->payload = (char*)packet->buffer
418                        + erf_get_framing_length(packet);
419        }
420
421}
422
423
424static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
425        int size = 0;
426        struct timeval tv;
427        dag_record_t *erfptr = NULL;
428        int numbytes = 0;
429       
430        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
431                        DUCK.duck_freq != 0) {
432                size = dag_get_duckinfo(libtrace, packet);
433                DUCK.last_duck = DUCK.last_pkt;
434                if (size != 0) {
435                        return size;
436                }
437                /* No DUCK support, so don't waste our time anymore */
438                DUCK.duck_freq = 0;
439        }
440
441        if (packet->buf_control == TRACE_CTRL_PACKET) {
442                packet->buf_control = TRACE_CTRL_EXTERNAL;
443                free(packet->buffer);
444                packet->buffer = 0;
445        }
446
447        do {
448                numbytes = dag_available(libtrace);
449                if (numbytes < 0)
450                        return numbytes;
451                if (numbytes < dag_record_size)
452                        /* Block until we see a packet */
453                        continue;
454                erfptr = dag_get_record(libtrace);
455        } while (erfptr == NULL);
456
457        dag_form_packet(erfptr, packet);
458        tv = trace_get_timeval(packet);
459        DUCK.last_pkt = tv.tv_sec;
460       
461        /* No loss counter for DSM coloured records - have to use
462         * some other API */
463        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
464               
465        } else {
466                DATA(libtrace)->drops += ntohs(erfptr->lctr);
467        }
468        return packet->payload ? htons(erfptr->rlen) : 
469                                erf_get_framing_length(packet);
470}
471
472static libtrace_eventobj_t trace_event_dag(libtrace_t *trace,
473                                        libtrace_packet_t *packet) {
474        libtrace_eventobj_t event = {0,0,0.0,0};
475        dag_record_t *erfptr = NULL;
476        int numbytes;
477       
478        /* Need to call dag_available so that the top pointer will get
479         * updated, otherwise we'll never see any data! */
480        numbytes = dag_available(trace);
481
482        /* May as well not bother calling dag_get_record if dag_available
483         * suggests that there's no data */
484        if (numbytes != 0)
485                erfptr = dag_get_record(trace);
486        if (erfptr == NULL) {
487                /* No packet available */
488                event.type = TRACE_EVENT_SLEEP;
489                event.seconds = 0.0001;
490                return event;
491        }
492        dag_form_packet(erfptr, packet);
493        event.size = trace_get_capture_length(packet) + trace_get_framing_length(packet);
494        if (trace->filter) {
495                if (trace_apply_filter(trace->filter, packet)) {
496                        event.type = TRACE_EVENT_PACKET;
497                } else {
498                        event.type = TRACE_EVENT_SLEEP;
499                        event.seconds = 0.000001;
500                        return event;
501                }
502        } else {
503                event.type = TRACE_EVENT_PACKET;
504        }
505
506        if (trace->snaplen > 0) {
507                trace_set_capture_length(packet, trace->snaplen);
508        }
509
510        return event;
511}
512
513static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
514        return DATA(trace)->drops;
515}
516
517static void dag_help(void) {
518        printf("dag format module: $Revision$\n");
519        printf("Supported input URIs:\n");
520        printf("\tdag:/dev/dagn\n");
521        printf("\n");
522        printf("\te.g.: dag:/dev/dag0\n");
523        printf("\n");
524        printf("Supported output URIs:\n");
525        printf("\tnone\n");
526        printf("\n");
527}
528
529static struct libtrace_format_t dag = {
530        "dag",
531        "$Id$",
532        TRACE_FORMAT_ERF,
533        dag_init_input,                 /* init_input */
534        dag_config_input,               /* config_input */
535        dag_start_input,                /* start_input */
536        dag_pause_input,                /* pause_input */
537        NULL,                           /* init_output */
538        NULL,                           /* config_output */
539        NULL,                           /* start_output */
540        dag_fin_input,                  /* fin_input */
541        NULL,                           /* fin_output */
542        dag_read_packet,                /* read_packet */
543        NULL,                           /* fin_packet */
544        NULL,                           /* write_packet */
545        erf_get_link_type,              /* get_link_type */
546        erf_get_direction,              /* get_direction */
547        erf_set_direction,              /* set_direction */
548        erf_get_erf_timestamp,          /* get_erf_timestamp */
549        NULL,                           /* get_timeval */
550        NULL,                           /* get_seconds */
551        NULL,                           /* seek_erf */
552        NULL,                           /* seek_timeval */
553        NULL,                           /* seek_seconds */
554        erf_get_capture_length,         /* get_capture_length */
555        erf_get_wire_length,            /* get_wire_length */
556        erf_get_framing_length,         /* get_framing_length */
557        erf_set_capture_length,         /* set_capture_length */
558        NULL,                           /* get_received_packets */
559        NULL,                           /* get_filtered_packets */
560        dag_get_dropped_packets,        /* get_dropped_packets */
561        NULL,                           /* get_captured_packets */
562        NULL,                           /* get_fd */
563        trace_event_dag,                /* trace_event */
564        dag_help,                       /* help */
565        NULL                            /* next pointer */
566};
567
568void dag_constructor(void) {
569        register_format(&dag);
570}
Note: See TracBrowser for help on using the repository browser.