Changeset f0b87a7 for lib/format_dag25.c


Ignore:
Timestamp:
03/23/09 12:27:39 (13 years ago)
Author:
Daniel Lawson <dlawson@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
5f4bef4
Parents:
013de36e
Message:

DAG write support. Work was referenced from the Endace DAG Programming Guide (EDM04-19)

Added dag_init_output(), dag_start_output(), dag_fin_output() and dag_write_packet() format functions.

Added datastructures: struct dag_format_data_out_t
Added other supporting internal functions: dag_init_format_out_data(), dag_open_output_device(), dag_pause_output()

dag_pause_output() may not make sense semantically, however the function essentially duplicates dag_pause_input(), but with the output device.

To use, specify an outputuri in the form dag:/dev/dag0 or dag:/dev/dag0,1
Note that while this supports using transmit streams other than 1, as of this commit there are no DAG cards that have mulitple transmit streams. Attempting to use a stream that does not work will result in an error inside libdag being passed back out to libtrace.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dag25.c

    r91b72d3 rf0b87a7  
    6767#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    6868#define FORMAT_DATA DATA(libtrace)
     69#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
     70#define FORMAT_DATA_OUT DATA_OUT(libtrace)
     71
    6972#define DUCK FORMAT_DATA->duck
    7073static struct libtrace_format_t dag;
     
    7780        struct dag_dev_t *prev;
    7881        struct dag_dev_t *next;
    79 };     
    80 
    81 struct dag_format_data_t {
    82         struct {
    83                 uint32_t last_duck;
    84                 uint32_t duck_freq;
    85                 uint32_t last_pkt;
    86                 libtrace_t *dummy_duck;
    87         } duck;
    88 
     82};
     83
     84struct dag_format_data_out_t {
    8985        struct dag_dev_t *device;
    9086        unsigned int dagstream;
     
    9692};
    9793
     94struct dag_format_data_t {
     95        struct {
     96                uint32_t last_duck;
     97                uint32_t duck_freq;
     98                uint32_t last_pkt;
     99                libtrace_t *dummy_duck;
     100        } duck;
     101
     102        struct dag_dev_t *device;
     103        unsigned int dagstream;
     104        int stream_attached;
     105        uint8_t *bottom;
     106        uint8_t *top;
     107        uint32_t processed;
     108        uint64_t drops;
     109};
     110
    98111pthread_mutex_t open_dag_mutex;
    99112struct dag_dev_t *open_dags = NULL;
    100113
    101 static void dag_probe_filename(const char *filename) 
     114static void dag_probe_filename(const char *filename)
    102115{
    103116        struct stat statbuf;
     
    114127}
    115128
     129static void dag_init_format_out_data(libtrace_out_t *libtrace) {
     130        libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
     131        // no DUCK on output
     132        FORMAT_DATA_OUT->stream_attached = 0;
     133        FORMAT_DATA_OUT->drops = 0;
     134        FORMAT_DATA_OUT->device = NULL;
     135        FORMAT_DATA_OUT->dagstream = 0;
     136        FORMAT_DATA_OUT->processed = 0;
     137        FORMAT_DATA_OUT->bottom = NULL;
     138        FORMAT_DATA_OUT->top = NULL;
     139
     140}
     141
    116142static void dag_init_format_data(libtrace_t *libtrace) {
    117143        libtrace->format_data = (struct dag_format_data_t *)
    118                 malloc(sizeof(struct dag_format_data_t));
     144                malloc(sizeof(struct dag_format_data_t));
    119145        DUCK.last_duck = 0;
    120146        DUCK.duck_freq = 0;
     
    133159static struct dag_dev_t *dag_find_open_device(char *dev_name) {
    134160        struct dag_dev_t *dag_dev;
    135        
     161
    136162        dag_dev = open_dags;
    137163
     
    142168                        dag_dev->ref_count ++;
    143169                        return dag_dev;
    144                        
     170
    145171                }
    146172                dag_dev = dag_dev->next;
    147173        }
    148174        return NULL;
    149                
    150        
     175
     176
    151177}
    152178
     
    154180static void dag_close_device(struct dag_dev_t *dev) {
    155181        /* Need to remove from the device list */
    156        
     182
    157183        assert(dev->ref_count == 0);
    158        
     184
    159185        if (dev->prev == NULL) {
    160186                open_dags = dev->next;
     
    168194
    169195        dag_close(dev->fd);
     196        if (dev->dev_name)
    170197        free(dev->dev_name);
    171198        free(dev);
    172                
     199}
     200
     201static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
     202        struct stat buf;
     203        int fd;
     204        struct dag_dev_t *new_dev;
     205
     206        if (stat(dev_name, &buf) == -1) {
     207                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
     208                return NULL;
     209}
     210
     211        if (S_ISCHR(buf.st_mode)) {
     212                if((fd = dag_open(dev_name)) < 0) {
     213                        trace_set_err_out(libtrace,errno,"Cannot open DAG %s",
     214                                        dev_name);
     215                        return NULL;
     216                }
     217        } else {
     218                trace_set_err_out(libtrace,errno,"Not a valid dag device: %s",
     219                                dev_name);
     220                return NULL;
     221        }
     222
     223        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
     224        new_dev->fd = fd;
     225        new_dev->dev_name = dev_name;
     226        new_dev->ref_count = 1;
     227
     228        new_dev->prev = NULL;
     229        new_dev->next = open_dags;
     230        if (open_dags)
     231                open_dags->prev = new_dev;
     232
     233        open_dags = new_dev;
     234
     235        return new_dev;
    173236}
    174237
     
    178241        int fd;
    179242        struct dag_dev_t *new_dev;
    180        
     243
    181244        if (stat(dev_name, &buf) == -1) {
    182245                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
    183246                return NULL;
    184247        }
    185        
     248
    186249        if (S_ISCHR(buf.st_mode)) {
    187250                if((fd = dag_open(dev_name)) < 0) {
     
    195258                return NULL;
    196259        }
    197        
     260
    198261        new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t));
    199262        new_dev->fd = fd;
    200263        new_dev->dev_name = dev_name;
    201264        new_dev->ref_count = 1;
    202        
     265
    203266        new_dev->prev = NULL;
    204267        new_dev->next = open_dags;
    205268        if (open_dags)
    206269                open_dags->prev = new_dev;
    207        
     270
    208271        open_dags = new_dev;
    209        
     272
    210273        return new_dev;
    211274}
    212        
     275
     276static int dag_init_output(libtrace_out_t *libtrace) {
     277        char *dag_dev_name = NULL;
     278        char *scan = NULL;
     279        struct dag_dev_t *dag_device = NULL;
     280        int stream = 1;
     281
     282        dag_init_format_out_data(libtrace);
     283        pthread_mutex_lock(&open_dag_mutex);
     284        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
     285                dag_dev_name = strdup(libtrace->uridata);
     286        } else {
     287                dag_dev_name = (char *)strndup(libtrace->uridata,
     288                                (size_t)(scan - libtrace->uridata));
     289                stream = atoi(++scan);
     290        }
     291        FORMAT_DATA->dagstream = stream;
     292
     293        dag_device = dag_find_open_device(dag_dev_name);
     294
     295        if (dag_device == NULL) {
     296                /* Device not yet opened - open it ourselves */
     297                dag_device = dag_open_output_device(libtrace, dag_dev_name);
     298        } else {
     299                free(dag_dev_name);
     300                dag_dev_name = NULL;
     301        }
     302
     303        if (dag_device == NULL) {
     304                if (dag_dev_name)
     305                        free(dag_dev_name);
     306                return -1;
     307        }
     308
     309        FORMAT_DATA->device = dag_device;
     310        pthread_mutex_unlock(&open_dag_mutex);
     311        return 0;
     312}
    213313
    214314static int dag_init_input(libtrace_t *libtrace) {
     
    217317        int stream = 0;
    218318        struct dag_dev_t *dag_device = NULL;
    219        
     319
    220320        dag_init_format_data(libtrace);
    221321        pthread_mutex_lock(&open_dag_mutex);
     
    227327                stream = atoi(++scan);
    228328        }
    229        
    230        
    231 
    232         /* For now, we don't offer the ability to select the stream */
     329
    233330        FORMAT_DATA->dagstream = stream;
    234331
     
    246343                if (dag_dev_name)
    247344                        free(dag_dev_name);
     345                dag_dev_name = NULL;
    248346                return -1;
    249347        }
    250348
    251349        FORMAT_DATA->device = dag_device;
    252        
     350
    253351        pthread_mutex_unlock(&open_dag_mutex);
    254352        return 0;
    255353}
    256        
     354
    257355static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
    258356                                void *data) {
     
    264362                case TRACE_OPTION_SNAPLEN:
    265363                        snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
    266                         if (dag_configure(FORMAT_DATA->device->fd, 
     364                        if (dag_configure(FORMAT_DATA->device->fd,
    267365                                                conf_str) != 0) {
    268366                                trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
     
    275373                case TRACE_OPTION_FILTER:
    276374                        return -1;
    277                 case TRACE_OPTION_EVENT_REALTIME: 
     375                case TRACE_OPTION_EVENT_REALTIME:
    278376                        return -1;
    279377        }
    280378        return -1;
     379}
     380static int dag_start_output(libtrace_out_t *libtrace) {
     381        struct timeval zero, nopoll;
     382        uint8_t *top, *bottom;
     383        uint8_t diff = 0;
     384        top = bottom = NULL;
     385
     386        zero.tv_sec = 0;
     387        zero.tv_usec = 0;
     388        nopoll = zero;
     389
     390        if (dag_attach_stream(FORMAT_DATA->device->fd,
     391                        FORMAT_DATA->dagstream, 0, 1048576) < 0) {
     392                trace_set_err_out(libtrace, errno, "Cannot attach DAG stream");
     393                return -1;
     394        }
     395
     396        if (dag_start_stream(FORMAT_DATA->device->fd,
     397                        FORMAT_DATA->dagstream) < 0) {
     398                trace_set_err_out(libtrace, errno, "Cannot start DAG stream");
     399                return -1;
     400        }
     401        FORMAT_DATA->stream_attached = 1;
     402
     403        /* We don't want the dag card to do any sleeping */
     404        dag_set_stream_poll(FORMAT_DATA->device->fd,
     405                        FORMAT_DATA->dagstream, 0, &zero,
     406                        &nopoll);
     407
     408        return 0;
    281409}
    282410
     
    292420
    293421
    294        
    295         if (dag_attach_stream(FORMAT_DATA->device->fd, 
     422
     423        if (dag_attach_stream(FORMAT_DATA->device->fd,
    296424                                FORMAT_DATA->dagstream, 0, 0) < 0) {
    297425                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
     
    299427        }
    300428
    301         if (dag_start_stream(FORMAT_DATA->device->fd, 
     429        if (dag_start_stream(FORMAT_DATA->device->fd,
    302430                                FORMAT_DATA->dagstream) < 0) {
    303431                trace_set_err(libtrace, errno, "Cannot start DAG stream");
     
    306434        FORMAT_DATA->stream_attached = 1;
    307435        /* We don't want the dag card to do any sleeping */
    308         dag_set_stream_poll(FORMAT_DATA->device->fd, 
    309                                 FORMAT_DATA->dagstream, 0, &zero, 
     436        dag_set_stream_poll(FORMAT_DATA->device->fd,
     437                                FORMAT_DATA->dagstream, 0, &zero,
    310438                                &nopoll);
    311        
     439
    312440        /* Should probably flush the memory hole now */
    313        
     441
    314442        do {
    315443                top = dag_advance_stream(FORMAT_DATA->device->fd,
     
    324452        FORMAT_DATA->processed = 0;
    325453        FORMAT_DATA->drops = 0;
    326        
     454
    327455        return 0;
    328456}
    329457
     458static int dag_pause_output(libtrace_out_t *libtrace) {
     459        if (dag_stop_stream(FORMAT_DATA->device->fd,
     460                        FORMAT_DATA->dagstream) < 0) {
     461                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
     462                return -1;
     463        }
     464        if (dag_detach_stream(FORMAT_DATA->device->fd,
     465                        FORMAT_DATA->dagstream) < 0) {
     466                trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
     467                return -1;
     468        }
     469        FORMAT_DATA->stream_attached = 0;
     470        return 0;
     471}
     472
    330473static int dag_pause_input(libtrace_t *libtrace) {
    331         if (dag_stop_stream(FORMAT_DATA->device->fd, 
     474        if (dag_stop_stream(FORMAT_DATA->device->fd,
    332475                                FORMAT_DATA->dagstream) < 0) {
    333476                trace_set_err(libtrace, errno, "Could not stop DAG stream");
    334477                return -1;
    335478        }
    336         if (dag_detach_stream(FORMAT_DATA->device->fd, 
     479        if (dag_detach_stream(FORMAT_DATA->device->fd,
    337480                                FORMAT_DATA->dagstream) < 0) {
    338481                trace_set_err(libtrace, errno, "Could not detach DAG stream");
     
    348491                dag_pause_input(libtrace);
    349492        FORMAT_DATA->device->ref_count --;
    350        
     493
    351494        if (FORMAT_DATA->device->ref_count == 0)
    352495                dag_close_device(FORMAT_DATA->device);
     
    356499        pthread_mutex_unlock(&open_dag_mutex);
    357500        return 0; /* success */
     501}
     502
     503static int dag_fin_output(libtrace_out_t *libtrace) {
     504        pthread_mutex_lock(&open_dag_mutex);
     505        if (FORMAT_DATA->stream_attached)
     506                dag_pause_output(libtrace);
     507        FORMAT_DATA->device->ref_count --;
     508
     509        if (FORMAT_DATA->device->ref_count == 0)
     510                dag_close_device(FORMAT_DATA->device);
     511        free(libtrace->format_data);
     512        pthread_mutex_unlock(&open_dag_mutex);
     513        return 0; /* success */
    358514}
    359515
     
    376532        /* No need to check if we can get DUCK or not - we're modern
    377533         * enough */
    378         if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK, 
     534        if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK,
    379535                                        (duckinf_t *)packet->payload) < 0)) {
    380536                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
     
    392548        uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
    393549        /* If we've processed more than 4MB of data since we last called
    394          * dag_advance_stream, then we should call it again to allow the 
     550         * dag_advance_stream, then we should call it again to allow the
    395551         * space occupied by that 4MB to be released */
    396552        if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
    397553                return diff;
    398         FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd, 
    399                         FORMAT_DATA->dagstream, 
     554        FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
     555                        FORMAT_DATA->dagstream,
    400556                        &(FORMAT_DATA->bottom));
    401557        if (FORMAT_DATA->top == NULL) {
     
    426582static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
    427583                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
    428        
     584
    429585        dag_record_t *erfptr;
    430        
    431         if (packet->buffer != buffer && 
     586
     587        if (packet->buffer != buffer &&
    432588                        packet->buf_control == TRACE_CTRL_PACKET) {
    433589                free(packet->buffer);
    434590        }
    435        
     591
    436592        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
    437593                packet->buf_control = TRACE_CTRL_PACKET;
    438594        } else
    439595                packet->buf_control = TRACE_CTRL_EXTERNAL;
    440        
     596
    441597        erfptr = (dag_record_t *)buffer;
    442598        packet->buffer = erfptr;
    443599        packet->header = erfptr;
    444600        packet->type = rt_type;
    445        
     601
    446602        if (erfptr->flags.rxerror == 1) {
    447603                /* rxerror means the payload is corrupt - drop it
     
    453609                        + erf_get_framing_length(packet);
    454610        }
    455        
     611
    456612        if (libtrace->format_data == NULL) {
    457                 dag_init_format_data(libtrace); 
    458         }
    459        
    460         /* No loss counter for DSM coloured records - have to use 
     613                dag_init_format_data(libtrace);
     614        }
     615
     616        /* No loss counter for DSM coloured records - have to use
    461617         * some other API */
    462618        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
    463                
     619
    464620        } else {
    465621                DATA(libtrace)->drops += ntohs(erfptr->lctr);
     
    469625}
    470626
     627
     628static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
     629        int err;
     630        void *record;
     631        int size = trace_get_capture_length(packet);
     632        size = size + (8 - (size % 8));
     633
     634        err = dag_tx_stream_copy_bytes(FORMAT_DATA->device->fd, FORMAT_DATA->dagstream,
     635                        packet->buffer, size);
     636
     637        if (err == NULL)
     638                trace_set_err(libtrace, errno, "dag_tx_stream_copy_bytes failed!");
     639
     640        return 0;
     641}
    471642
    472643static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
     
    476647        int numbytes = 0;
    477648        uint32_t flags = 0;
    478        
     649
    479650        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
    480651                        DUCK.duck_freq != 0) {
     
    489660
    490661        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
    491        
     662
    492663        if (packet->buf_control == TRACE_CTRL_PACKET) {
    493664                free(packet->buffer);
     
    511682        tv = trace_get_timeval(packet);
    512683        DUCK.last_pkt = tv.tv_sec;
    513        
    514         return packet->payload ? htons(erfptr->rlen) : 
     684
     685        return packet->payload ? htons(erfptr->rlen) :
    515686                                erf_get_framing_length(packet);
    516687}
     
    522693        int numbytes;
    523694        uint32_t flags = 0;
    524        
     695
    525696        /* Need to call dag_available so that the top pointer will get
    526697         * updated, otherwise we'll never see any data! */
     
    538709        }
    539710        //dag_form_packet(erfptr, packet);
    540         if (dag_prepare_packet(trace, packet, erfptr, TRACE_RT_DATA_ERF, 
     711        if (dag_prepare_packet(trace, packet, erfptr, TRACE_RT_DATA_ERF,
    541712                                flags)) {
    542713                event.type = TRACE_EVENT_TERMINATE;
    543714                return event;
    544715        }
    545                
    546                
     716
     717
    547718        event.size = trace_get_capture_length(packet) + trace_get_framing_length(packet);
    548719        if (trace->filter) {
     
    593764        dag_start_input,                /* start_input */
    594765        dag_pause_input,                /* pause_input */
    595         NULL,                           /* init_output */
     766        dag_init_output,                /* init_output */ /* done */
    596767        NULL,                           /* config_output */
    597         NULL,                           /* start_output */
     768        dag_start_output,               /* start_output */ /* done */
    598769        dag_fin_input,                  /* fin_input */
    599         NULL,                           /* fin_output */
     770        dag_fin_output,                 /* fin_output */ /* done */
    600771        dag_read_packet,                /* read_packet */
    601772        dag_prepare_packet,             /* prepare_packet */
    602773        NULL,                           /* fin_packet */
    603         NULL,                           /* write_packet */
     774        dag_write_packet,               /* write_packet */ /* todo */
    604775        erf_get_link_type,              /* get_link_type */
    605776        erf_get_direction,              /* get_direction */
Note: See TracChangeset for help on using the changeset viewer.