source: lib/format_dag25.c @ 91b72d3

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 91b72d3 was 91b72d3, checked in by Perry Lorier <perry@…>, 13 years ago

Try to autoguess the tracetype if the format uri specify is not present

  • Property mode set to 100644
File size: 17.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31#define _GNU_SOURCE
32
33#include "config.h"
34#include "common.h"
35#include "libtrace.h"
36#include "libtrace_int.h"
37#include "format_helper.h"
38#include "format_erf.h"
39
40#include <assert.h>
41#include <errno.h>
42#include <fcntl.h>
43#include <stdio.h>
44#include <string.h>
45#include <stdlib.h>
46
47#include <sys/mman.h>
48/* XXX: Windows doesn't have pthreads, but this code doesn't compile under
49 * Windows anyway so we'll worry about this more later :] */
50#include <pthread.h>
51
52
53#ifdef WIN32
54#  include <io.h>
55#  include <share.h>
56#  define PATH_MAX _MAX_PATH
57#  define snprintf sprintf_s
58#else
59#  include <netdb.h>
60#  ifndef PATH_MAX
61#       define PATH_MAX 4096
62#  endif
63#  include <sys/ioctl.h>
64#endif
65
66
67#define DATA(x) ((struct dag_format_data_t *)x->format_data)
68#define FORMAT_DATA DATA(libtrace)
69#define DUCK FORMAT_DATA->duck
70static struct libtrace_format_t dag;
71
72struct dag_dev_t {
73        //pthread_mutex_t dag_mutex;
74        char * dev_name;
75        int fd;
76        uint16_t ref_count;
77        struct dag_dev_t *prev;
78        struct dag_dev_t *next;
79};     
80
81struct dag_format_data_t {
82        struct {
83                uint32_t last_duck;
84                uint32_t duck_freq;
85                uint32_t last_pkt;
86                libtrace_t *dummy_duck;
87        } duck;
88
89        struct dag_dev_t *device;
90        unsigned int dagstream;
91        int stream_attached;
92        uint8_t *bottom;
93        uint8_t *top;
94        uint32_t processed;
95        uint64_t drops;
96};
97
98pthread_mutex_t open_dag_mutex;
99struct dag_dev_t *open_dags = NULL;
100
101static void dag_probe_filename(const char *filename) 
102{
103        struct stat statbuf;
104        /* Can we stat the file? */
105        if (stat(filename, &statbuf) != 0) {
106                return 0;
107        }
108        /* Is it a character device? */
109        if (!S_ISCHR(statbuf.st_mode)) {
110                return 0;
111        }
112        /* Yeah, it's probably us. */
113        return 1;
114}
115
116static void dag_init_format_data(libtrace_t *libtrace) {
117        libtrace->format_data = (struct dag_format_data_t *)
118                malloc(sizeof(struct dag_format_data_t));
119        DUCK.last_duck = 0;
120        DUCK.duck_freq = 0;
121        DUCK.last_pkt = 0;
122        DUCK.dummy_duck = NULL;
123        FORMAT_DATA->stream_attached = 0;
124        FORMAT_DATA->drops = 0;
125        FORMAT_DATA->device = NULL;
126        FORMAT_DATA->dagstream = 0;
127        FORMAT_DATA->processed = 0;
128        FORMAT_DATA->bottom = NULL;
129        FORMAT_DATA->top = NULL;
130}
131
132/* NOTE: This function assumes the open_dag_mutex is held by the caller */
133static struct dag_dev_t *dag_find_open_device(char *dev_name) {
134        struct dag_dev_t *dag_dev;
135       
136        dag_dev = open_dags;
137
138        /* XXX: Not exactly zippy, but how often are we going to be dealing
139         * with multiple dag cards? */
140        while (dag_dev != NULL) {
141                if (strcmp(dag_dev->dev_name, dev_name) == 0) {
142                        dag_dev->ref_count ++;
143                        return dag_dev;
144                       
145                }
146                dag_dev = dag_dev->next;
147        }
148        return NULL;
149               
150       
151}
152
153/* NOTE: This function assumes the open_dag_mutex is held by the caller */
154static void dag_close_device(struct dag_dev_t *dev) {
155        /* Need to remove from the device list */
156       
157        assert(dev->ref_count == 0);
158       
159        if (dev->prev == NULL) {
160                open_dags = dev->next;
161                if (dev->next)
162                        dev->next->prev = NULL;
163        } else {
164                dev->prev->next = dev->next;
165                if (dev->next)
166                        dev->next->prev = dev->prev;
167        }
168
169        dag_close(dev->fd);
170        free(dev->dev_name);
171        free(dev);
172               
173}
174
175/* NOTE: This function assumes the open_dag_mutex is held by the caller */
176static struct dag_dev_t *dag_open_device(libtrace_t *libtrace, char *dev_name) {
177        struct stat buf;
178        int fd;
179        struct dag_dev_t *new_dev;
180       
181        if (stat(dev_name, &buf) == -1) {
182                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
183                return NULL;
184        }
185       
186        if (S_ISCHR(buf.st_mode)) {
187                if((fd = dag_open(dev_name)) < 0) {
188                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
189                                        dev_name);
190                        return NULL;
191                }
192        } else {
193                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
194                                dev_name);
195                return NULL;
196        }
197       
198        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
199        new_dev->fd = fd;
200        new_dev->dev_name = dev_name;
201        new_dev->ref_count = 1;
202       
203        new_dev->prev = NULL;
204        new_dev->next = open_dags;
205        if (open_dags)
206                open_dags->prev = new_dev;
207       
208        open_dags = new_dev;
209       
210        return new_dev;
211}
212       
213
214static int dag_init_input(libtrace_t *libtrace) {
215        char *dag_dev_name = NULL;
216        char *scan = NULL;
217        int stream = 0;
218        struct dag_dev_t *dag_device = NULL;
219       
220        dag_init_format_data(libtrace);
221        pthread_mutex_lock(&open_dag_mutex);
222        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
223                dag_dev_name = strdup(libtrace->uridata);
224        } else {
225                dag_dev_name = (char *)strndup(libtrace->uridata,
226                                (size_t)(scan - libtrace->uridata));
227                stream = atoi(++scan);
228        }
229       
230       
231
232        /* For now, we don't offer the ability to select the stream */
233        FORMAT_DATA->dagstream = stream;
234
235        dag_device = dag_find_open_device(dag_dev_name);
236
237        if (dag_device == NULL) {
238                /* Device not yet opened - open it ourselves */
239                dag_device = dag_open_device(libtrace, dag_dev_name);
240        } else {
241                free(dag_dev_name);
242                dag_dev_name = NULL;
243        }
244
245        if (dag_device == NULL) {
246                if (dag_dev_name)
247                        free(dag_dev_name);
248                return -1;
249        }
250
251        FORMAT_DATA->device = dag_device;
252       
253        pthread_mutex_unlock(&open_dag_mutex);
254        return 0;
255}
256       
257static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
258                                void *data) {
259        char conf_str[4096];
260        switch(option) {
261                case TRACE_OPTION_META_FREQ:
262                        DUCK.duck_freq = *(int *)data;
263                        return 0;
264                case TRACE_OPTION_SNAPLEN:
265                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
266                        if (dag_configure(FORMAT_DATA->device->fd, 
267                                                conf_str) != 0) {
268                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
269                                return -1;
270                        }
271                        return 0;
272                case TRACE_OPTION_PROMISC:
273                        /* DAG already operates in a promisc fashion */
274                        return -1;
275                case TRACE_OPTION_FILTER:
276                        return -1;
277                case TRACE_OPTION_EVENT_REALTIME:
278                        return -1;
279        }
280        return -1;
281}
282
283static int dag_start_input(libtrace_t *libtrace) {
284        struct timeval zero, nopoll;
285        uint8_t *top, *bottom;
286        uint8_t diff = 0;
287        top = bottom = NULL;
288
289        zero.tv_sec = 0;
290        zero.tv_usec = 0;
291        nopoll = zero;
292
293
294       
295        if (dag_attach_stream(FORMAT_DATA->device->fd, 
296                                FORMAT_DATA->dagstream, 0, 0) < 0) {
297                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
298                return -1;
299        }
300
301        if (dag_start_stream(FORMAT_DATA->device->fd, 
302                                FORMAT_DATA->dagstream) < 0) {
303                trace_set_err(libtrace, errno, "Cannot start DAG stream");
304                return -1;
305        }
306        FORMAT_DATA->stream_attached = 1;
307        /* We don't want the dag card to do any sleeping */
308        dag_set_stream_poll(FORMAT_DATA->device->fd, 
309                                FORMAT_DATA->dagstream, 0, &zero, 
310                                &nopoll);
311       
312        /* Should probably flush the memory hole now */
313       
314        do {
315                top = dag_advance_stream(FORMAT_DATA->device->fd,
316                                        FORMAT_DATA->dagstream,
317                                        &bottom);
318                assert(top && bottom);
319                diff = top - bottom;
320                bottom -= diff;
321        } while (diff != 0);
322        FORMAT_DATA->top = NULL;
323        FORMAT_DATA->bottom = NULL;
324        FORMAT_DATA->processed = 0;
325        FORMAT_DATA->drops = 0;
326       
327        return 0;
328}
329
330static int dag_pause_input(libtrace_t *libtrace) {
331        if (dag_stop_stream(FORMAT_DATA->device->fd, 
332                                FORMAT_DATA->dagstream) < 0) {
333                trace_set_err(libtrace, errno, "Could not stop DAG stream");
334                return -1;
335        }
336        if (dag_detach_stream(FORMAT_DATA->device->fd, 
337                                FORMAT_DATA->dagstream) < 0) {
338                trace_set_err(libtrace, errno, "Could not detach DAG stream");
339                return -1;
340        }
341        FORMAT_DATA->stream_attached = 0;
342        return 0;
343}
344
345static int dag_fin_input(libtrace_t *libtrace) {
346        pthread_mutex_lock(&open_dag_mutex);
347        if (FORMAT_DATA->stream_attached)
348                dag_pause_input(libtrace);
349        FORMAT_DATA->device->ref_count --;
350       
351        if (FORMAT_DATA->device->ref_count == 0)
352                dag_close_device(FORMAT_DATA->device);
353        if (DUCK.dummy_duck)
354                trace_destroy_dead(DUCK.dummy_duck);
355        free(libtrace->format_data);
356        pthread_mutex_unlock(&open_dag_mutex);
357        return 0; /* success */
358}
359
360static int dag_get_duckinfo(libtrace_t *libtrace,
361                                libtrace_packet_t *packet) {
362        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
363                        !packet->buffer) {
364                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
365                packet->buf_control = TRACE_CTRL_PACKET;
366                if (!packet->buffer) {
367                        trace_set_err(libtrace, errno,
368                                        "Cannot allocate packet buffer");
369                        return -1;
370                }
371        }
372
373        packet->header = 0;
374        packet->payload = packet->buffer;
375
376        /* No need to check if we can get DUCK or not - we're modern
377         * enough */
378        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK, 
379                                        (duckinf_t *)packet->payload) < 0)) {
380                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
381                return -1;
382        }
383
384        packet->type = TRACE_RT_DUCK_2_5;
385        if (!DUCK.dummy_duck)
386                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
387        packet->trace = DUCK.dummy_duck;
388        return sizeof(duckinf_t);
389}
390
391static int dag_available(libtrace_t *libtrace) {
392        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
393        /* If we've processed more than 4MB of data since we last called
394         * dag_advance_stream, then we should call it again to allow the
395         * space occupied by that 4MB to be released */
396        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
397                return diff;
398        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd, 
399                        FORMAT_DATA->dagstream, 
400                        &(FORMAT_DATA->bottom));
401        if (FORMAT_DATA->top == NULL) {
402                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
403                return -1;
404        }
405        FORMAT_DATA->processed = 0;
406        diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
407        return diff;
408}
409
410static dag_record_t *dag_get_record(libtrace_t *libtrace) {
411        dag_record_t *erfptr = NULL;
412        uint16_t size;
413        erfptr = (dag_record_t *)FORMAT_DATA->bottom;
414        if (!erfptr)
415                return NULL;
416        size = ntohs(erfptr->rlen);
417        assert( size >= dag_record_size );
418        /* Make certain we have the full packet available */
419        if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
420                return NULL;
421        FORMAT_DATA->bottom += size;
422        FORMAT_DATA->processed += size;
423        return erfptr;
424}
425
426static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
427                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
428       
429        dag_record_t *erfptr;
430       
431        if (packet->buffer != buffer && 
432                        packet->buf_control == TRACE_CTRL_PACKET) {
433                free(packet->buffer);
434        }
435       
436        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
437                packet->buf_control = TRACE_CTRL_PACKET;
438        } else
439                packet->buf_control = TRACE_CTRL_EXTERNAL;
440       
441        erfptr = (dag_record_t *)buffer;
442        packet->buffer = erfptr;
443        packet->header = erfptr;
444        packet->type = rt_type;
445       
446        if (erfptr->flags.rxerror == 1) {
447                /* rxerror means the payload is corrupt - drop it
448                 * by tweaking rlen */
449                packet->payload = NULL;
450                erfptr->rlen = htons(erf_get_framing_length(packet));
451        } else {
452                packet->payload = (char*)packet->buffer
453                        + erf_get_framing_length(packet);
454        }
455       
456        if (libtrace->format_data == NULL) {
457                dag_init_format_data(libtrace); 
458        }
459       
460        /* No loss counter for DSM coloured records - have to use
461         * some other API */
462        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
463               
464        } else {
465                DATA(libtrace)->drops += ntohs(erfptr->lctr);
466        }
467
468        return 0;
469}
470
471
472static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
473        int size = 0;
474        struct timeval tv;
475        dag_record_t *erfptr = NULL;
476        int numbytes = 0;
477        uint32_t flags = 0;
478       
479        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
480                        DUCK.duck_freq != 0) {
481                size = dag_get_duckinfo(libtrace, packet);
482                DUCK.last_duck = DUCK.last_pkt;
483                if (size != 0) {
484                        return size;
485                }
486                /* No DUCK support, so don't waste our time anymore */
487                DUCK.duck_freq = 0;
488        }
489
490        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
491       
492        if (packet->buf_control == TRACE_CTRL_PACKET) {
493                free(packet->buffer);
494                packet->buffer = 0;
495        }
496
497        do {
498                numbytes = dag_available(libtrace);
499                if (numbytes < 0)
500                        return numbytes;
501                if (numbytes < dag_record_size)
502                        /* Block until we see a packet */
503                        continue;
504                erfptr = dag_get_record(libtrace);
505        } while (erfptr == NULL);
506
507        //dag_form_packet(erfptr, packet);
508        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
509                                flags))
510                return -1;
511        tv = trace_get_timeval(packet);
512        DUCK.last_pkt = tv.tv_sec;
513       
514        return packet->payload ? htons(erfptr->rlen) : 
515                                erf_get_framing_length(packet);
516}
517
518static libtrace_eventobj_t trace_event_dag(libtrace_t *trace,
519                                        libtrace_packet_t *packet) {
520        libtrace_eventobj_t event = {0,0,0.0,0};
521        dag_record_t *erfptr = NULL;
522        int numbytes;
523        uint32_t flags = 0;
524       
525        /* Need to call dag_available so that the top pointer will get
526         * updated, otherwise we'll never see any data! */
527        numbytes = dag_available(trace);
528
529        /* May as well not bother calling dag_get_record if dag_available
530         * suggests that there's no data */
531        if (numbytes != 0)
532                erfptr = dag_get_record(trace);
533        if (erfptr == NULL) {
534                /* No packet available */
535                event.type = TRACE_EVENT_SLEEP;
536                event.seconds = 0.0001;
537                return event;
538        }
539        //dag_form_packet(erfptr, packet);
540        if (dag_prepare_packet(trace, packet, erfptr, TRACE_RT_DATA_ERF, 
541                                flags)) {
542                event.type = TRACE_EVENT_TERMINATE;
543                return event;
544        }
545               
546               
547        event.size = trace_get_capture_length(packet) + trace_get_framing_length(packet);
548        if (trace->filter) {
549                if (trace_apply_filter(trace->filter, packet)) {
550                        event.type = TRACE_EVENT_PACKET;
551                } else {
552                        event.type = TRACE_EVENT_SLEEP;
553                        event.seconds = 0.000001;
554                        return event;
555                }
556        } else {
557                event.type = TRACE_EVENT_PACKET;
558        }
559
560        if (trace->snaplen > 0) {
561                trace_set_capture_length(packet, trace->snaplen);
562        }
563
564        return event;
565}
566
567static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
568        if (trace->format_data == NULL)
569                return (uint64_t)-1;
570        return DATA(trace)->drops;
571}
572
573static void dag_help(void) {
574        printf("dag format module: $Revision$\n");
575        printf("Supported input URIs:\n");
576        printf("\tdag:/dev/dagn\n");
577        printf("\n");
578        printf("\te.g.: dag:/dev/dag0\n");
579        printf("\n");
580        printf("Supported output URIs:\n");
581        printf("\tnone\n");
582        printf("\n");
583}
584
585static struct libtrace_format_t dag = {
586        "dag",
587        "$Id$",
588        TRACE_FORMAT_ERF,
589        dag_probe_filename,             /* probe filename */
590        NULL,                           /* probe magic */
591        dag_init_input,                 /* init_input */
592        dag_config_input,               /* config_input */
593        dag_start_input,                /* start_input */
594        dag_pause_input,                /* pause_input */
595        NULL,                           /* init_output */
596        NULL,                           /* config_output */
597        NULL,                           /* start_output */
598        dag_fin_input,                  /* fin_input */
599        NULL,                           /* fin_output */
600        dag_read_packet,                /* read_packet */
601        dag_prepare_packet,             /* prepare_packet */
602        NULL,                           /* fin_packet */
603        NULL,                           /* write_packet */
604        erf_get_link_type,              /* get_link_type */
605        erf_get_direction,              /* get_direction */
606        erf_set_direction,              /* set_direction */
607        erf_get_erf_timestamp,          /* get_erf_timestamp */
608        NULL,                           /* get_timeval */
609        NULL,                           /* get_seconds */
610        NULL,                           /* seek_erf */
611        NULL,                           /* seek_timeval */
612        NULL,                           /* seek_seconds */
613        erf_get_capture_length,         /* get_capture_length */
614        erf_get_wire_length,            /* get_wire_length */
615        erf_get_framing_length,         /* get_framing_length */
616        erf_set_capture_length,         /* set_capture_length */
617        NULL,                           /* get_received_packets */
618        NULL,                           /* get_filtered_packets */
619        dag_get_dropped_packets,        /* get_dropped_packets */
620        NULL,                           /* get_captured_packets */
621        NULL,                           /* get_fd */
622        trace_event_dag,                /* trace_event */
623        dag_help,                       /* help */
624        NULL                            /* next pointer */
625};
626
627void dag_constructor(void) {
628        register_format(&dag);
629}
Note: See TracBrowser for help on using the repository browser.