Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dag25.c

    rc5ac872 r6b98325  
    7979 */
    8080
    81 
    8281#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8382#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
     83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
    8484
    8585#define FORMAT_DATA DATA(libtrace)
     
    8787
    8888#define DUCK FORMAT_DATA->duck
     89
     90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
     91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
     92
    8993static struct libtrace_format_t dag;
    9094
     
    104108/* "Global" data that is stored for each DAG output trace */
    105109struct dag_format_data_out_t {
    106         /* String containing the DAG device name */
    107         char *device_name;
    108110        /* The DAG device being used for writing */
    109111        struct dag_dev_t *device;
     
    118120};
    119121
     122/* Data that is stored against each input stream */
     123struct dag_per_stream_t {
     124        /* DAG stream number */
     125        uint16_t dagstream;
     126        /* Pointer to the last unread byte in the DAG memory */
     127        uint8_t *top;
     128        /* Pointer to the first unread byte in the DAG memory */
     129        uint8_t *bottom;
     130        /* Amount of data processed from the bottom pointer */
     131        uint32_t processed;
     132        /* Number of packets seen by the stream */
     133        uint64_t pkt_count;
     134        /* Drop count for this particular stream */
     135        uint64_t drops;
     136        /* Boolean values to indicate if a particular interface has been seen
     137         * or not. This is limited to four interfaces, which is enough to
     138         * support all current DAG cards */
     139        uint8_t seeninterface[4];
     140};
     141
    120142/* "Global" data that is stored for each DAG input trace */
    121143struct dag_format_data_t {
    122 
    123         /* Data required for regular DUCK reporting */
     144        /* DAG device */
     145        struct dag_dev_t *device;
     146        /* Boolean flag indicating whether the trace is currently attached */
     147        int stream_attached;
     148        /* Data stored against each DAG input stream */
     149        libtrace_list_t *per_stream;
     150
     151        /* Data required for regular DUCK reporting.
     152         * We put this on a new cache line otherwise we have a lot of false
     153         * sharing caused by updating the last_pkt.
     154         * This should only ever be accessed by the first thread stream,
     155         * that includes both read and write operations.
     156         */
    124157        struct {
    125158                /* Timestamp of the last DUCK report */
    126159                uint32_t last_duck;
    127160                /* The number of seconds between each DUCK report */
    128                 uint32_t duck_freq;
     161                uint32_t duck_freq;
    129162                /* Timestamp of the last packet read from the DAG card */
    130                 uint32_t last_pkt;
     163                uint32_t last_pkt;
    131164                /* Dummy trace to ensure DUCK packets are dealt with using the
    132165                 * DUCK format functions */
    133                 libtrace_t *dummy_duck;
    134         } duck;
    135 
    136         /* String containing the DAG device name */
    137         char *device_name;
    138         /* The DAG device that we are reading from */
    139         struct dag_dev_t *device;
    140         /* The DAG stream that we are reading from */
    141         unsigned int dagstream;
    142         /* Boolean flag indicating whether the stream is currently attached */
    143         int stream_attached;
    144         /* Pointer to the first unread byte in the DAG memory hole */
    145         uint8_t *bottom;
    146         /* Pointer to the last unread byte in the DAG memory hole */
    147         uint8_t *top;
    148         /* The amount of data processed thus far from the bottom pointer */
    149         uint32_t processed;
    150         /* The number of packets that have been dropped */
    151         uint64_t drops;
    152 
    153         uint8_t seeninterface[4];
     166                libtrace_t *dummy_duck;
     167        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
    154168};
    155169
     
    207221
    208222/* Initialises the DAG output data structure */
    209 static void dag_init_format_out_data(libtrace_out_t *libtrace) {
    210         libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
     223static void dag_init_format_out_data(libtrace_out_t *libtrace)
     224{
     225        libtrace->format_data = (struct dag_format_data_out_t *)
     226                malloc(sizeof(struct dag_format_data_out_t));
    211227        // no DUCK on output
    212228        FORMAT_DATA_OUT->stream_attached = 0;
    213229        FORMAT_DATA_OUT->device = NULL;
    214         FORMAT_DATA_OUT->device_name = NULL;
    215230        FORMAT_DATA_OUT->dagstream = 0;
    216231        FORMAT_DATA_OUT->waiting = 0;
     
    219234
    220235/* Initialises the DAG input data structure */
    221 static void dag_init_format_data(libtrace_t *libtrace) {
     236static void dag_init_format_data(libtrace_t *libtrace)
     237{
     238        struct dag_per_stream_t stream_data;
     239
    222240        libtrace->format_data = (struct dag_format_data_t *)
    223241                malloc(sizeof(struct dag_format_data_t));
    224242        DUCK.last_duck = 0;
    225         DUCK.duck_freq = 0;
    226         DUCK.last_pkt = 0;
    227         DUCK.dummy_duck = NULL;
    228         FORMAT_DATA->stream_attached = 0;
    229         FORMAT_DATA->drops = 0;
    230         FORMAT_DATA->device_name = NULL;
    231         FORMAT_DATA->device = NULL;
    232         FORMAT_DATA->dagstream = 0;
    233         FORMAT_DATA->processed = 0;
    234         FORMAT_DATA->bottom = NULL;
    235         FORMAT_DATA->top = NULL;
    236         memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
     243        DUCK.duck_freq = 0;
     244        DUCK.last_pkt = 0;
     245        DUCK.dummy_duck = NULL;
     246
     247        FORMAT_DATA->per_stream =
     248                libtrace_list_init(sizeof(stream_data));
     249        assert(FORMAT_DATA->per_stream != NULL);
     250
     251        /* We'll start with just one instance of stream_data, and we'll
     252         * add more later if we need them */
     253        memset(&stream_data, 0, sizeof(stream_data));
     254        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
    237255}
    238256
     
    241259 *
    242260 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    243 static struct dag_dev_t *dag_find_open_device(char *dev_name) {
     261static struct dag_dev_t *dag_find_open_device(char *dev_name)
     262{
    244263        struct dag_dev_t *dag_dev;
    245264
     
    252271                        dag_dev->ref_count ++;
    253272                        return dag_dev;
    254 
    255273                }
    256274                dag_dev = dag_dev->next;
    257275        }
    258276        return NULL;
    259 
    260 
    261277}
    262278
     
    267283 *
    268284 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    269 static void dag_close_device(struct dag_dev_t *dev) {
     285static void dag_close_device(struct dag_dev_t *dev)
     286{
    270287        /* Need to remove from the device list */
    271 
    272288        assert(dev->ref_count == 0);
    273289
     
    292308 *
    293309 * NOTE: this function should only be called when opening a DAG device for
    294  * writing - there is little practical difference between this and the 
     310 * writing - there is little practical difference between this and the
    295311 * function below that covers the reading case, but we need the output trace
    296  * object to report errors properly so the two functions take slightly 
     312 * object to report errors properly so the two functions take slightly
    297313 * different arguments. This is really lame and there should be a much better
    298314 * way of doing this.
    299315 *
    300  * NOTE: This function assumes the open_dag_mutex is held by the caller 
     316 * NOTE: This function assumes the open_dag_mutex is held by the caller
    301317 */
    302 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
     318static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
     319                                                char *dev_name)
     320{
    303321        struct stat buf;
    304322        int fd;
     
    309327                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
    310328                return NULL;
    311 }
     329        }
    312330
    313331        /* Make sure it is the appropriate type of device */
     
    346364 *
    347365 * NOTE: this function should only be called when opening a DAG device for
    348  * reading - there is little practical difference between this and the 
     366 * reading - there is little practical difference between this and the
    349367 * function above that covers the writing case, but we need the input trace
    350  * object to report errors properly so the two functions take slightly 
     368 * object to report errors properly so the two functions take slightly
    351369 * different arguments. This is really lame and there should be a much better
    352370 * way of doing this.
     
    359377
    360378        /* Make sure the device exists */
    361         if (stat(dev_name, &buf) == -1) {
    362                 trace_set_err(libtrace,errno,"stat(%s)",dev_name);
    363                 return NULL;
    364         }
     379        if (stat(dev_name, &buf) == -1) {
     380                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
     381                return NULL;
     382        }
    365383
    366384        /* Make sure it is the appropriate type of device */
     
    368386                /* Try opening the DAG device */
    369387                if((fd = dag_open(dev_name)) < 0) {
    370                         trace_set_err(libtrace,errno,"Cannot open DAG %s",
    371                                         dev_name);
    372                         return NULL;
    373                 }
     388                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
     389                                      dev_name);
     390                        return NULL;
     391                }
    374392        } else {
    375393                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
    376                                 dev_name);
    377                 return NULL;
    378         }
     394                              dev_name);
     395                return NULL;
     396        }
    379397
    380398        /* Add the device to our device list - it is just a doubly linked
     
    397415
    398416/* Creates and initialises a DAG output trace */
    399 static int dag_init_output(libtrace_out_t *libtrace) {
     417static int dag_init_output(libtrace_out_t *libtrace)
     418{
     419        /* Upon successful creation, the device name is stored against the
     420         * device and free when it is free()d */
     421        char *dag_dev_name = NULL;
    400422        char *scan = NULL;
    401423        struct dag_dev_t *dag_device = NULL;
    402424        int stream = 1;
    403        
     425
    404426        /* XXX I don't know if this is important or not, but this function
    405427         * isn't present in all of the driver releases that this code is
     
    411433
    412434        dag_init_format_out_data(libtrace);
    413         /* Grab the mutex while we're likely to be messing with the device 
     435        /* Grab the mutex while we're likely to be messing with the device
    414436         * list */
    415437        pthread_mutex_lock(&open_dag_mutex);
    416        
     438
    417439        /* Specific streams are signified using a comma in the libtrace URI,
    418440         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
     
    420442         * If no stream is specified, we will write using stream 1 */
    421443        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    422                 FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);
     444                dag_dev_name = strdup(libtrace->uridata);
    423445        } else {
    424                 FORMAT_DATA_OUT->device_name =
    425                                 (char *)strndup(libtrace->uridata,
     446                dag_dev_name = (char *)strndup(libtrace->uridata,
    426447                                (size_t)(scan - libtrace->uridata));
    427448                stream = atoi(++scan);
     
    430451
    431452        /* See if our DAG device is already open */
    432         dag_device = dag_find_open_device(FORMAT_DATA_OUT->device_name);
     453        dag_device = dag_find_open_device(dag_dev_name);
    433454
    434455        if (dag_device == NULL) {
    435456                /* Device not yet opened - open it ourselves */
    436                 dag_device = dag_open_output_device(libtrace,
    437                                 FORMAT_DATA_OUT->device_name);
     457                dag_device = dag_open_output_device(libtrace, dag_dev_name);
     458        } else {
     459                /* Otherwise, just use the existing one */
     460                free(dag_dev_name);
     461                dag_dev_name = NULL;
    438462        }
    439463
    440464        /* Make sure we have successfully opened a DAG device */
    441465        if (dag_device == NULL) {
    442                 if (FORMAT_DATA_OUT->device_name) {
    443                         free(FORMAT_DATA_OUT->device_name);
    444                         FORMAT_DATA_OUT->device_name = NULL;
     466                if (dag_dev_name) {
     467                        free(dag_dev_name);
    445468                }
    446469                pthread_mutex_unlock(&open_dag_mutex);
     
    455478/* Creates and initialises a DAG input trace */
    456479static int dag_init_input(libtrace_t *libtrace) {
     480        /* Upon successful creation, the device name is stored against the
     481         * device and free when it is free()d */
     482        char *dag_dev_name = NULL;
    457483        char *scan = NULL;
    458484        int stream = 0;
     
    460486
    461487        dag_init_format_data(libtrace);
    462         /* Grab the mutex while we're likely to be messing with the device 
     488        /* Grab the mutex while we're likely to be messing with the device
    463489         * list */
    464490        pthread_mutex_lock(&open_dag_mutex);
    465        
    466        
    467         /* Specific streams are signified using a comma in the libtrace URI,
     491
     492
     493        /* DAG cards support multiple streams. In a single threaded capture,
     494         * these are specified using a comma in the libtrace URI,
    468495         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
    469496         *
    470          * If no stream is specified, we will read from stream 0 */
     497         * If no stream is specified, we will read from stream 0 with
     498         * one thread
     499         */
    471500        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    472                 FORMAT_DATA->device_name = strdup(libtrace->uridata);
     501                dag_dev_name = strdup(libtrace->uridata);
    473502        } else {
    474                 FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,
     503                dag_dev_name = (char *)strndup(libtrace->uridata,
    475504                                (size_t)(scan - libtrace->uridata));
    476505                stream = atoi(++scan);
    477506        }
    478507
    479         FORMAT_DATA->dagstream = stream;
     508        FORMAT_DATA_FIRST->dagstream = stream;
    480509
    481510        /* See if our DAG device is already open */
    482         dag_device = dag_find_open_device(FORMAT_DATA->device_name);
     511        dag_device = dag_find_open_device(dag_dev_name);
    483512
    484513        if (dag_device == NULL) {
    485514                /* Device not yet opened - open it ourselves */
    486                 dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name);
     515                dag_device = dag_open_device(libtrace, dag_dev_name);
     516        } else {
     517                /* Otherwise, just use the existing one */
     518                free(dag_dev_name);
     519                dag_dev_name = NULL;
    487520        }
    488521
    489522        /* Make sure we have successfully opened a DAG device */
    490523        if (dag_device == NULL) {
    491                 if (FORMAT_DATA->device_name)
    492                         free(FORMAT_DATA->device_name);
    493                 FORMAT_DATA->device_name = NULL;
     524                if (dag_dev_name)
     525                        free(dag_dev_name);
     526                dag_dev_name = NULL;
    494527                pthread_mutex_unlock(&open_dag_mutex);
    495528                return -1;
     
    498531        FORMAT_DATA->device = dag_device;
    499532
    500         /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
    501         /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
    502  
    503         *  The symptom of the port being disabled is that libtrace will appear to hang.
    504         */
     533        /* See Config_Status_API_Programming_Guide.pdf from the Endace
     534           Dag Documentation */
     535        /* Check kBooleanAttributeActive is true -- no point capturing
     536         * on an interface that's disabled
     537         *
     538         * The symptom of the port being disabled is that libtrace
     539         * will appear to hang. */
    505540        /* Check kBooleanAttributeFault is false */
    506541        /* Check kBooleanAttributeLocalFault is false */
     
    508543        /* Check kBooleanAttributePeerLink ? */
    509544
    510         /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
     545        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
     546           on libtrace promisc attribute?*/
    511547        /* Set kUint32AttributeSnapLength to the snaplength */
    512548
    513549        pthread_mutex_unlock(&open_dag_mutex);
    514         return 0;
     550        return 0;
    515551}
    516552
    517553/* Configures a DAG input trace */
    518554static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
    519                                 void *data) {
    520         char conf_str[4096];
     555                            void *data)
     556{
     557        char conf_str[4096];
    521558        switch(option) {
    522                 case TRACE_OPTION_META_FREQ:
    523                         /* This option is used to specify the frequency of DUCK
    524                          * updates */
    525                         DUCK.duck_freq = *(int *)data;
    526                         return 0;
    527                 case TRACE_OPTION_SNAPLEN:
    528                         /* Tell the card our new snap length */
    529                         snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
    530                         if (dag_configure(FORMAT_DATA->device->fd,
    531                                                 conf_str) != 0) {
    532                                 trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
    533                                 return -1;
    534                         }
    535                         return 0;
    536                 case TRACE_OPTION_PROMISC:
    537                         /* DAG already operates in a promisc fashion */
    538                         return -1;
    539                 case TRACE_OPTION_FILTER:
    540                         /* We don't yet support pushing filters into DAG
    541                          * cards */
    542                         return -1;
    543                 case TRACE_OPTION_EVENT_REALTIME:
    544                         /* Live capture is always going to be realtime */
     559        case TRACE_OPTION_META_FREQ:
     560                /* This option is used to specify the frequency of DUCK
     561                 * updates */
     562                DUCK.duck_freq = *(int *)data;
     563                return 0;
     564        case TRACE_OPTION_SNAPLEN:
     565                /* Tell the card our new snap length */
     566                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
     567                if (dag_configure(FORMAT_DATA->device->fd,
     568                                  conf_str) != 0) {
     569                        trace_set_err(libtrace, errno, "Failed to configure "
     570                                      "snaplen on DAG card: %s",
     571                                      libtrace->uridata);
    545572                        return -1;
    546         }
     573                }
     574                return 0;
     575        case TRACE_OPTION_PROMISC:
     576                /* DAG already operates in a promisc fashion */
     577                return -1;
     578        case TRACE_OPTION_FILTER:
     579                /* We don't yet support pushing filters into DAG
     580                 * cards */
     581                return -1;
     582        case TRACE_OPTION_EVENT_REALTIME:
     583                /* Live capture is always going to be realtime */
     584                return -1;
     585        case TRACE_OPTION_HASHER:
     586                /* Lets just say we did this, it's currently still up to
     587                 * the user to configure this correctly. */
     588                return 0;
     589        }
    547590        return -1;
    548591}
    549592
    550593/* Starts a DAG output trace */
    551 static int dag_start_output(libtrace_out_t *libtrace) {
     594static int dag_start_output(libtrace_out_t *libtrace)
     595{
    552596        struct timeval zero, nopoll;
    553597
     
    557601
    558602        /* Attach and start the DAG stream */
    559 
    560603        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
    561604                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
     
    572615
    573616        /* We don't want the dag card to do any sleeping */
    574 
    575617        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
    576618                        FORMAT_DATA_OUT->dagstream, 0, &zero,
     
    580622}
    581623
    582 /* Starts a DAG input trace */
    583 static int dag_start_input(libtrace_t *libtrace) {
    584         struct timeval zero, nopoll;
    585         uint8_t *top, *bottom, *starttop;
     624static int dag_start_input_stream(libtrace_t *libtrace,
     625                                  struct dag_per_stream_t * stream) {
     626        struct timeval zero, nopoll;
     627        uint8_t *top, *bottom, *starttop;
    586628        top = bottom = NULL;
    587629
    588630        zero.tv_sec = 0;
    589         zero.tv_usec = 10000;
    590         nopoll = zero;
     631        zero.tv_usec = 10000;
     632        nopoll = zero;
    591633
    592634        /* Attach and start the DAG stream */
    593635        if (dag_attach_stream(FORMAT_DATA->device->fd,
    594                                 FORMAT_DATA->dagstream, 0, 0) < 0) {
    595                 trace_set_err(libtrace, errno, "Cannot attach DAG stream");
    596                 return -1;
    597         }
     636                              stream->dagstream, 0, 0) < 0) {
     637                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
     638                              stream->dagstream);
     639                return -1;
     640        }
    598641
    599642        if (dag_start_stream(FORMAT_DATA->device->fd,
    600                                 FORMAT_DATA->dagstream) < 0) {
    601                 trace_set_err(libtrace, errno, "Cannot start DAG stream");
    602                 return -1;
    603         }
     643                             stream->dagstream) < 0) {
     644                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
     645                              stream->dagstream);
     646                return -1;
     647        }
    604648        FORMAT_DATA->stream_attached = 1;
    605        
     649
    606650        /* We don't want the dag card to do any sleeping */
    607         dag_set_stream_poll(FORMAT_DATA->device->fd,
    608                                 FORMAT_DATA->dagstream, 0, &zero,
    609                                 &nopoll);
     651        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
     652                            stream->dagstream, 0, &zero,
     653                            &nopoll) < 0) {
     654                trace_set_err(libtrace, errno,
     655                              "dag_set_stream_poll failed!");
     656                return -1;
     657        }
    610658
    611659        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
    612                                         FORMAT_DATA->dagstream,
    613                                         &bottom);
     660                                      stream->dagstream,
     661                                      &bottom);
    614662
    615663        /* Should probably flush the memory hole now */
     
    618666                bottom += (starttop - bottom);
    619667                top = dag_advance_stream(FORMAT_DATA->device->fd,
    620                                         FORMAT_DATA->dagstream,
    621                                         &bottom);
    622         }
    623         FORMAT_DATA->top = top;
    624         FORMAT_DATA->bottom = bottom;
    625         FORMAT_DATA->processed = 0;
    626         FORMAT_DATA->drops = 0;
     668                                         stream->dagstream,
     669                                         &bottom);
     670        }
     671        stream->top = top;
     672        stream->bottom = bottom;
     673        stream->processed = 0;
     674        stream->drops = 0;
    627675
    628676        return 0;
     677
     678}
     679
     680/* Starts a DAG input trace */
     681static int dag_start_input(libtrace_t *libtrace)
     682{
     683        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
     684}
     685
     686static int dag_pstart_input(libtrace_t *libtrace)
     687{
     688        char *scan, *tok;
     689        uint16_t stream_count = 0, max_streams;
     690        int iserror = 0;
     691        struct dag_per_stream_t stream_data;
     692
     693        /* Check we aren't trying to create more threads than the DAG card can
     694         * handle */
     695        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
     696        if (libtrace->perpkt_thread_count > max_streams) {
     697                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     698                              "trying to create too many threads (max is %u)",
     699                              max_streams);
     700                iserror = 1;
     701                goto cleanup;
     702        }
     703
     704        /* Get the stream names from the uri */
     705        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     706                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     707                              "format uri doesn't specify the DAG streams");
     708                iserror = 1;
     709                goto cleanup;
     710        }
     711
     712        scan++;
     713
     714        tok = strtok(scan, ",");
     715        while (tok != NULL) {
     716                /* Ensure we haven't specified too many streams */
     717                if (stream_count >= libtrace->perpkt_thread_count) {
     718                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     719                                      "format uri specifies too many streams. "
     720                                      "Max is %u", max_streams);
     721                        iserror = 1;
     722                        goto cleanup;
     723                }
     724
     725                /* Save the stream details */
     726                if (stream_count == 0) {
     727                        /* Special case where we update the existing stream
     728                         * data structure */
     729                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
     730                } else {
     731                        memset(&stream_data, 0, sizeof(stream_data));
     732                        stream_data.dagstream = (uint16_t)atoi(tok);
     733                        libtrace_list_push_back(FORMAT_DATA->per_stream,
     734                                                &stream_data);
     735                }
     736
     737                stream_count++;
     738                tok = strtok(NULL, ",");
     739        }
     740
     741        FORMAT_DATA->stream_attached = 1;
     742
     743 cleanup:
     744        if (iserror) {
     745                return -1;
     746        } else {
     747                return 0;
     748        }
    629749}
    630750
    631751/* Pauses a DAG output trace */
    632 static int dag_pause_output(libtrace_out_t *libtrace) {
    633 
     752static int dag_pause_output(libtrace_out_t *libtrace)
     753{
    634754        /* Stop and detach the stream */
    635755        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
    636                         FORMAT_DATA_OUT->dagstream) < 0) {
     756                            FORMAT_DATA_OUT->dagstream) < 0) {
    637757                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
    638758                return -1;
    639759        }
    640760        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
    641                         FORMAT_DATA_OUT->dagstream) < 0) {
    642                 trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
     761                              FORMAT_DATA_OUT->dagstream) < 0) {
     762                trace_set_err_out(libtrace, errno,
     763                                  "Could not detach DAG stream");
    643764                return -1;
    644765        }
     
    648769
    649770/* Pauses a DAG input trace */
    650 static int dag_pause_input(libtrace_t *libtrace) {
    651 
    652         /* Stop and detach the stream */
    653         if (dag_stop_stream(FORMAT_DATA->device->fd,
    654                                 FORMAT_DATA->dagstream) < 0) {
    655                 trace_set_err(libtrace, errno, "Could not stop DAG stream");
    656                 return -1;
    657         }
    658         if (dag_detach_stream(FORMAT_DATA->device->fd,
    659                                 FORMAT_DATA->dagstream) < 0) {
    660                 trace_set_err(libtrace, errno, "Could not detach DAG stream");
    661                 return -1;
    662         }
     771static int dag_pause_input(libtrace_t *libtrace)
     772{
     773        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     774
     775        /* Stop and detach each stream */
     776        while (tmp != NULL) {
     777                if (dag_stop_stream(FORMAT_DATA->device->fd,
     778                                    STREAM_DATA(tmp)->dagstream) < 0) {
     779                        trace_set_err(libtrace, errno,
     780                                      "Could not stop DAG stream");
     781                        printf("Count not stop DAG stream\n");
     782                        return -1;
     783                }
     784                if (dag_detach_stream(FORMAT_DATA->device->fd,
     785                                      STREAM_DATA(tmp)->dagstream) < 0) {
     786                        trace_set_err(libtrace, errno,
     787                                      "Could not detach DAG stream");
     788                        printf("Count not detach DAG stream\n");
     789                        return -1;
     790                }
     791
     792                tmp = tmp->next;
     793        }
     794
    663795        FORMAT_DATA->stream_attached = 0;
    664796        return 0;
    665797}
    666798
     799
     800
    667801/* Closes a DAG input trace */
    668 static int dag_fin_input(libtrace_t *libtrace) {
     802static int dag_fin_input(libtrace_t *libtrace)
     803{
    669804        /* Need the lock, since we're going to be handling the device list */
    670805        pthread_mutex_lock(&open_dag_mutex);
    671        
     806
    672807        /* Detach the stream if we are not paused */
    673808        if (FORMAT_DATA->stream_attached)
    674809                dag_pause_input(libtrace);
    675         FORMAT_DATA->device->ref_count --;
     810        FORMAT_DATA->device->ref_count--;
    676811
    677812        /* Close the DAG device if there are no more references to it */
    678813        if (FORMAT_DATA->device->ref_count == 0)
    679814                dag_close_device(FORMAT_DATA->device);
     815
    680816        if (DUCK.dummy_duck)
    681817                trace_destroy_dead(DUCK.dummy_duck);
    682         if (FORMAT_DATA->device_name)
    683                 free(FORMAT_DATA->device_name);
     818
     819        /* Clear the list */
     820        libtrace_list_deinit(FORMAT_DATA->per_stream);
    684821        free(libtrace->format_data);
    685822        pthread_mutex_unlock(&open_dag_mutex);
    686         return 0; /* success */
     823        return 0; /* success */
    687824}
    688825
    689826/* Closes a DAG output trace */
    690 static int dag_fin_output(libtrace_out_t *libtrace) {
    691        
     827static int dag_fin_output(libtrace_out_t *libtrace)
     828{
     829
    692830        /* Commit any outstanding traffic in the txbuffer */
    693831        if (FORMAT_DATA_OUT->waiting) {
    694                 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    695                                 FORMAT_DATA_OUT->waiting );
    696         }
    697 
    698         /* Wait until the buffer is nearly clear before exiting the program,
     832                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     833                                           FORMAT_DATA_OUT->dagstream,
     834                                           FORMAT_DATA_OUT->waiting );
     835        }
     836
     837        /* Wait until the buffer is nearly clear before exiting the program,
    699838         * as we will lose packets otherwise */
    700         dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    701                         FORMAT_DATA_OUT->dagstream,
    702                         dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
    703                                         FORMAT_DATA_OUT->dagstream) - 8
    704                         );
     839        dag_tx_get_stream_space
     840                (FORMAT_DATA_OUT->device->fd,
     841                 FORMAT_DATA_OUT->dagstream,
     842                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
     843                                            FORMAT_DATA_OUT->dagstream) - 8);
    705844
    706845        /* Need the lock, since we're going to be handling the device list */
     
    715854        if (FORMAT_DATA_OUT->device->ref_count == 0)
    716855                dag_close_device(FORMAT_DATA_OUT->device);
    717         if (FORMAT_DATA_OUT->device_name)
    718                 free(FORMAT_DATA_OUT->device_name);
    719856        free(libtrace->format_data);
    720857        pthread_mutex_unlock(&open_dag_mutex);
     
    752889
    753890        /* Allocate memory for the DUCK data */
    754         if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
    755                         !packet->buffer) {
    756                 packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
    757                 packet->buf_control = TRACE_CTRL_PACKET;
    758                 if (!packet->buffer) {
    759                         trace_set_err(libtrace, errno,
    760                                         "Cannot allocate packet buffer");
    761                         return -1;
    762                 }
    763         }
     891        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
     892            !packet->buffer) {
     893                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
     894                packet->buf_control = TRACE_CTRL_PACKET;
     895                if (!packet->buffer) {
     896                        trace_set_err(libtrace, errno,
     897                                      "Cannot allocate packet buffer");
     898                        return -1;
     899                }
     900        }
    764901
    765902        /* DUCK doesn't have a format header */
    766         packet->header = 0;
    767         packet->payload = packet->buffer;
    768 
    769         /* No need to check if we can get DUCK or not - we're modern
    770         * enough so just grab the DUCK info */
    771         if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
    772                                         (duckinf_t *)packet->payload) < 0)) {
    773                 trace_set_err(libtrace, errno, "Error using DUCK ioctl");
     903        packet->header = 0;
     904        packet->payload = packet->buffer;
     905
     906        /* No need to check if we can get DUCK or not - we're modern
     907        * enough so just grab the DUCK info */
     908        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
     909                   (duckinf_t *)packet->payload) < 0)) {
     910                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
    774911                DUCK.duck_freq = 0;
    775                 return -1;
    776         }
    777 
    778         packet->type = LIBTRACE_DUCK_VERSION;
     912                return -1;
     913        }
     914
     915        packet->type = LIBTRACE_DUCK_VERSION;
    779916
    780917        /* Set the packet's trace to point at a DUCK trace, so that the
    781918         * DUCK format functions will be called on the packet rather than the
    782919         * DAG ones */
    783         if (!DUCK.dummy_duck)
    784                 DUCK.dummy_duck = trace_create_dead("duck:dummy");
    785         packet->trace = DUCK.dummy_duck;
    786         DUCK.last_duck = DUCK.last_pkt;
    787         return sizeof(duckinf_t);
     920        if (!DUCK.dummy_duck)
     921                DUCK.dummy_duck = trace_create_dead("duck:dummy");
     922        packet->trace = DUCK.dummy_duck;
     923        DUCK.last_duck = DUCK.last_pkt;
     924        packet->error = sizeof(duckinf_t);
     925        return sizeof(duckinf_t);
    788926}
    789927
    790928/* Determines the amount of data available to read from the DAG card */
    791 static int dag_available(libtrace_t *libtrace) {
    792         uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     929static int dag_available(libtrace_t *libtrace,
     930                         struct dag_per_stream_t *stream_data)
     931{
     932        uint32_t diff = stream_data->top - stream_data->bottom;
    793933
    794934        /* If we've processed more than 4MB of data since we last called
    795935         * dag_advance_stream, then we should call it again to allow the
    796936         * space occupied by that 4MB to be released */
    797         if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
     937        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
    798938                return diff;
    799        
     939
    800940        /* Update the top and bottom pointers */
    801         FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
    802                         FORMAT_DATA->dagstream,
    803                         &(FORMAT_DATA->bottom));
    804        
    805         if (FORMAT_DATA->top == NULL) {
     941        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
     942                                              stream_data->dagstream,
     943                                              &(stream_data->bottom));
     944
     945        if (stream_data->top == NULL) {
    806946                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
    807947                return -1;
    808948        }
    809         FORMAT_DATA->processed = 0;
    810         diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     949        stream_data->processed = 0;
     950        diff = stream_data->top - stream_data->bottom;
    811951        return diff;
    812952}
    813953
    814954/* Returns a pointer to the start of the next complete ERF record */
    815 static dag_record_t *dag_get_record(libtrace_t *libtrace) {
    816         dag_record_t *erfptr = NULL;
    817         uint16_t size;
    818         erfptr = (dag_record_t *)FORMAT_DATA->bottom;
     955static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
     956{
     957        dag_record_t *erfptr = NULL;
     958        uint16_t size;
     959
     960        erfptr = (dag_record_t *)stream_data->bottom;
    819961        if (!erfptr)
    820                 return NULL;
    821         size = ntohs(erfptr->rlen);
    822         assert( size >= dag_record_size );
     962                return NULL;
     963
     964        size = ntohs(erfptr->rlen);
     965        assert( size >= dag_record_size );
     966
    823967        /* Make certain we have the full packet available */
    824         if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
     968        if (size > (stream_data->top - stream_data->bottom))
    825969                return NULL;
    826         FORMAT_DATA->bottom += size;
    827         FORMAT_DATA->processed += size;
     970
     971        stream_data->bottom += size;
     972        stream_data->processed += size;
    828973        return erfptr;
    829974}
     
    831976/* Converts a buffer containing a recently read DAG packet record into a
    832977 * libtrace packet */
    833 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
    834                 void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
    835 
     978static int dag_prepare_packet_stream(libtrace_t *libtrace,
     979                                     struct dag_per_stream_t *stream_data,
     980                                     libtrace_packet_t *packet,
     981                                     void *buffer, libtrace_rt_types_t rt_type,
     982                                     uint32_t flags)
     983{
    836984        dag_record_t *erfptr;
    837        
     985
    838986        /* If the packet previously owned a buffer that is not the buffer
    839         * that contains the new packet data, we're going to need to free the
    840         * old one to avoid memory leaks */
     987        * that contains the new packet data, we're going to need to free the
     988        * old one to avoid memory leaks */
    841989        if (packet->buffer != buffer &&
    842                         packet->buf_control == TRACE_CTRL_PACKET) {
     990            packet->buf_control == TRACE_CTRL_PACKET) {
    843991                free(packet->buffer);
    844992        }
     
    8531001        erfptr = (dag_record_t *)buffer;
    8541002        packet->buffer = erfptr;
    855         packet->header = erfptr;
    856         packet->type = rt_type;
     1003        packet->header = erfptr;
     1004        packet->type = rt_type;
    8571005
    8581006        if (erfptr->flags.rxerror == 1) {
    859                 /* rxerror means the payload is corrupt - drop the payload
    860                 * by tweaking rlen */
    861                 packet->payload = NULL;
    862                 erfptr->rlen = htons(erf_get_framing_length(packet));
    863         } else {
    864                 packet->payload = (char*)packet->buffer
    865                         + erf_get_framing_length(packet);
    866         }
     1007                /* rxerror means the payload is corrupt - drop the payload
     1008                * by tweaking rlen */
     1009                packet->payload = NULL;
     1010                erfptr->rlen = htons(erf_get_framing_length(packet));
     1011        } else {
     1012                packet->payload = (char*)packet->buffer
     1013                        + erf_get_framing_length(packet);
     1014        }
    8671015
    8681016        if (libtrace->format_data == NULL) {
     
    8711019
    8721020        /* Update the dropped packets counter */
    873 
    874         /* No loss counter for DSM coloured records - have to use
    875          * some other API */
     1021        /* No loss counter for DSM coloured records - have to use some
     1022         * other API */
    8761023        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
    8771024                /* TODO */
    8781025        } else {
    8791026                /* Use the ERF loss counter */
    880                 if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
    881                         FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
     1027                if (stream_data->seeninterface[erfptr->flags.iface]
     1028                    == 0) {
     1029                        stream_data->seeninterface[erfptr->flags.iface]
     1030                                = 1;
    8821031                } else {
    883                         FORMAT_DATA->drops += ntohs(erfptr->lctr);
     1032                        stream_data->drops += ntohs(erfptr->lctr);
    8841033                }
    8851034        }
    8861035
    8871036        return 0;
     1037}
     1038
     1039static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
     1040                              void *buffer, libtrace_rt_types_t rt_type,
     1041                              uint32_t flags)
     1042{
     1043        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
     1044                                       buffer, rt_type, flags);
    8881045}
    8891046
     
    8981055/* Pushes an ERF record onto the transmit stream */
    8991056static int dag_dump_packet(libtrace_out_t *libtrace,
    900                 dag_record_t *erfptr, unsigned int pad, void *buffer) {
     1057                           dag_record_t *erfptr, unsigned int pad,
     1058                           void *buffer)
     1059{
    9011060        int size;
    9021061
    9031062        /*
    904          * If we've got 0 bytes waiting in the txqueue, assume that we haven't
    905          * requested any space yet, and request some, storing the pointer at
    906          * FORMAT_DATA_OUT->txbuffer.
     1063         * If we've got 0 bytes waiting in the txqueue, assume that we
     1064         * haven't requested any space yet, and request some, storing
     1065         * the pointer at FORMAT_DATA_OUT->txbuffer.
    9071066         *
    9081067         * The amount to request is slightly magical at the moment - it's
     
    9111070         */
    9121071        if (FORMAT_DATA_OUT->waiting == 0) {
    913                 FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    914                                 FORMAT_DATA_OUT->dagstream, 16908288);
     1072                FORMAT_DATA_OUT->txbuffer =
     1073                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
     1074                                                FORMAT_DATA_OUT->dagstream,
     1075                                                16908288);
    9151076        }
    9161077
     
    9191080         * are in contiguous memory
    9201081         */
    921         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
     1082        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
     1083               (dag_record_size + pad));
    9221084        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
    923 
    924 
    9251085
    9261086        /*
     
    9291089         */
    9301090        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
    931         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
     1091        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
     1092               size);
    9321093        FORMAT_DATA_OUT->waiting += size;
    9331094
     
    9381099         * case there is still data in the buffer at program exit.
    9391100         */
    940 
    9411101        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
    942                 FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    943                         FORMAT_DATA_OUT->waiting );
     1102                FORMAT_DATA_OUT->txbuffer =
     1103                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     1104                                                   FORMAT_DATA_OUT->dagstream,
     1105                                                   FORMAT_DATA_OUT->waiting);
    9441106                FORMAT_DATA_OUT->waiting = 0;
    9451107        }
    9461108
    9471109        return size + pad + dag_record_size;
    948 
    9491110}
    9501111
     
    9521113 * if one is found, false otherwise */
    9531114static bool find_compatible_linktype(libtrace_out_t *libtrace,
    954                                 libtrace_packet_t *packet, char *type)
    955 {
    956          // Keep trying to simplify the packet until we can find
    957          //something we can do with it
     1115                                     libtrace_packet_t *packet, char *type)
     1116{
     1117        /* Keep trying to simplify the packet until we can find
     1118         * something we can do with it */
    9581119
    9591120        do {
    960                 *type=libtrace_to_erf_type(trace_get_link_type(packet));
    961 
    962                 // Success
     1121                *type = libtrace_to_erf_type(trace_get_link_type(packet));
     1122
     1123                /* Success */
    9631124                if (*type != (char)-1)
    9641125                        return true;
     
    9661127                if (!demote_packet(packet)) {
    9671128                        trace_set_err_out(libtrace,
    968                                         TRACE_ERR_NO_CONVERSION,
    969                                         "No erf type for packet (%i)",
    970                                         trace_get_link_type(packet));
     1129                                          TRACE_ERR_NO_CONVERSION,
     1130                                          "No erf type for packet (%i)",
     1131                                          trace_get_link_type(packet));
    9711132                        return false;
    9721133                }
     
    9781139
    9791140/* Writes a packet to the provided DAG output trace */
    980 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
    981         /*
    982          * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
    983          * sucks, sorry about that.
     1141static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
     1142{
     1143        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
     1144         * coding sucks, sorry about that.
    9841145         */
    9851146        unsigned int pad = 0;
     
    9901151
    9911152        if(!packet->header) {
    992                 /* No header, probably an RT packet. Lifted from 
     1153                /* No header, probably an RT packet. Lifted from
    9931154                 * erf_write_packet(). */
    9941155                return -1;
     
    10091170
    10101171        if (packet->type == TRACE_RT_DATA_ERF) {
    1011                 numbytes = dag_dump_packet(libtrace,
    1012                                 header,
    1013                                 pad,
    1014                                 payload
    1015                                 );
    1016 
     1172                numbytes = dag_dump_packet(libtrace, header, pad, payload);
    10171173        } else {
    10181174                /* Build up a new packet header from the existing header */
    10191175
    1020                 /* Simplify the packet first - if we can't do this, break 
     1176                /* Simplify the packet first - if we can't do this, break
    10211177                 * early */
    10221178                if (!find_compatible_linktype(libtrace,packet,&erf_type))
     
    10371193
    10381194                /* Packet length (rlen includes format overhead) */
    1039                 assert(trace_get_capture_length(packet)>0
    1040                                 && trace_get_capture_length(packet)<=65536);
    1041                 assert(erf_get_framing_length(packet)>0
    1042                                 && trace_get_framing_length(packet)<=65536);
    1043                 assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
    1044                       &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
     1195                assert(trace_get_capture_length(packet) > 0
     1196                       && trace_get_capture_length(packet) <= 65536);
     1197                assert(erf_get_framing_length(packet) > 0
     1198                       && trace_get_framing_length(packet) <= 65536);
     1199                assert(trace_get_capture_length(packet) +
     1200                       erf_get_framing_length(packet) > 0
     1201                       && trace_get_capture_length(packet) +
     1202                       erf_get_framing_length(packet) <= 65536);
    10451203
    10461204                erfhdr.rlen = htons(trace_get_capture_length(packet)
    1047                         + erf_get_framing_length(packet));
     1205                                    + erf_get_framing_length(packet));
    10481206
    10491207
     
    10541212
    10551213                /* Write it out */
    1056                 numbytes = dag_dump_packet(libtrace,
    1057                                 &erfhdr,
    1058                                 pad,
    1059                                 payload);
    1060 
     1214                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
    10611215        }
    10621216
     
    10681222 * If DUCK reporting is enabled, the packet returned may be a DUCK update
    10691223 */
    1070 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    1071         int size = 0;
    1072         struct timeval tv;
    1073         dag_record_t *erfptr = NULL;
     1224static int dag_read_packet_stream(libtrace_t *libtrace,
     1225                                struct dag_per_stream_t *stream_data,
     1226                                libtrace_thread_t *t, /* Optional */
     1227                                libtrace_packet_t *packet)
     1228{
     1229        int size = 0;
     1230        dag_record_t *erfptr = NULL;
     1231        struct timeval tv;
    10741232        int numbytes = 0;
    10751233        uint32_t flags = 0;
    1076         struct timeval maxwait;
    1077         struct timeval pollwait;
     1234        struct timeval maxwait, pollwait;
    10781235
    10791236        pollwait.tv_sec = 0;
     
    10821239        maxwait.tv_usec = 250000;
    10831240
    1084         /* Check if we're due for a DUCK report */
    1085         size = dag_get_duckinfo(libtrace, packet);
    1086 
    1087         if (size != 0)
    1088                 return size;
     1241        /* Check if we're due for a DUCK report - only report on the first thread */
     1242        if (stream_data == FORMAT_DATA_FIRST) {
     1243                size = dag_get_duckinfo(libtrace, packet);
     1244                if (size != 0)
     1245                        return size;
     1246        }
    10891247
    10901248
     
    10941252        /* If the packet buffer is currently owned by libtrace, free it so
    10951253         * that we can set the packet to point into the DAG memory hole */
    1096         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1097                 free(packet->buffer);
    1098                 packet->buffer = 0;
    1099         }
    1100        
    1101         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1102                         FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait,
    1103                         &pollwait) == -1)
    1104         {
     1254        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1255                free(packet->buffer);
     1256                packet->buffer = 0;
     1257        }
     1258
     1259        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
     1260                                sizeof(dag_record_t), &maxwait,
     1261                                &pollwait) == -1) {
    11051262                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11061263                return -1;
    11071264        }
    1108 
    11091265
    11101266        /* Grab a full ERF record */
    11111267        do {
    1112                 numbytes = dag_available(libtrace);
     1268                numbytes = dag_available(libtrace, stream_data);
    11131269                if (numbytes < 0)
    11141270                        return numbytes;
    11151271                if (numbytes < dag_record_size) {
     1272                        /* Check the message queue if we have one to check */
     1273                        if (t != NULL &&
     1274                            libtrace_message_queue_count(&t->messages) > 0)
     1275                                return -2;
     1276
    11161277                        if (libtrace_halt)
    11171278                                return 0;
     
    11191280                        continue;
    11201281                }
    1121                 erfptr = dag_get_record(libtrace);
     1282                erfptr = dag_get_record(stream_data);
    11221283        } while (erfptr == NULL);
    11231284
     1285        packet->trace = libtrace;
    11241286        /* Prepare the libtrace packet */
    1125         if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
    1126                                 flags))
    1127                 return -1;
    1128 
    1129         /* Update the DUCK timer */
    1130         tv = trace_get_timeval(packet);
    1131         DUCK.last_pkt = tv.tv_sec;
    1132 
    1133         return packet->payload ? htons(erfptr->rlen) :
    1134                                 erf_get_framing_length(packet);
     1287        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
     1288                                    TRACE_RT_DATA_ERF, flags))
     1289                return -1;
     1290
     1291        /* Update the DUCK timer - don't re-order this check (false-sharing) */
     1292        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
     1293                tv = trace_get_timeval(packet);
     1294                DUCK.last_pkt = tv.tv_sec;
     1295        }
     1296
     1297        packet->error = packet->payload ? htons(erfptr->rlen) :
     1298                                          erf_get_framing_length(packet);
     1299
     1300        return packet->error;
     1301}
     1302
     1303static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1304{
     1305        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
     1306}
     1307
     1308static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
     1309                             libtrace_packet_t **packets, size_t nb_packets)
     1310{
     1311        int ret;
     1312        size_t read_packets = 0;
     1313        int numbytes = 0;
     1314
     1315        struct dag_per_stream_t *stream_data =
     1316                (struct dag_per_stream_t *)t->format_data;
     1317
     1318        /* Read as many packets as we can, but read atleast one packet */
     1319        do {
     1320                ret = dag_read_packet_stream(libtrace, stream_data, t,
     1321                                           packets[read_packets]);
     1322                if (ret < 0)
     1323                        return ret;
     1324
     1325                read_packets++;
     1326
     1327                /* Make sure we don't read too many packets..! */
     1328                if (read_packets >= nb_packets)
     1329                        break;
     1330
     1331                numbytes = dag_available(libtrace, stream_data);
     1332        } while (numbytes >= dag_record_size);
     1333
     1334        return read_packets;
    11351335}
    11361336
     
    11401340 */
    11411341static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
    1142                                         libtrace_packet_t *packet) {
    1143         libtrace_eventobj_t event = {0,0,0.0,0};
     1342                                           libtrace_packet_t *packet)
     1343{
     1344        libtrace_eventobj_t event = {0,0,0.0,0};
    11441345        dag_record_t *erfptr = NULL;
    11451346        int numbytes;
     
    11601361        }
    11611362       
    1162         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1163                         FORMAT_DATA->dagstream, 0, &minwait,
    1164                         &minwait) == -1)
    1165         {
     1363        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
     1364                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
     1365                                &minwait) == -1) {
    11661366                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11671367                event.type = TRACE_EVENT_TERMINATE;
     
    11721372                erfptr = NULL;
    11731373                numbytes = 0;
    1174        
     1374
    11751375                /* Need to call dag_available so that the top pointer will get
    11761376                 * updated, otherwise we'll never see any data! */
    1177                 numbytes = dag_available(libtrace);
    1178 
    1179                 /* May as well not bother calling dag_get_record if 
     1377                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
     1378
     1379                /* May as well not bother calling dag_get_record if
    11801380                 * dag_available suggests that there's no data */
    11811381                if (numbytes != 0)
    1182                         erfptr = dag_get_record(libtrace);
     1382                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
    11831383                if (erfptr == NULL) {
    11841384                        /* No packet available - sleep for a very short time */
    11851385                        if (libtrace_halt) {
    11861386                                event.type = TRACE_EVENT_TERMINATE;
    1187                         } else {                       
     1387                        } else {
    11881388                                event.type = TRACE_EVENT_SLEEP;
    11891389                                event.seconds = 0.0001;
     
    11911391                        break;
    11921392                }
    1193                 if (dag_prepare_packet(libtrace, packet, erfptr,
    1194                                         TRACE_RT_DATA_ERF, flags)) {
     1393                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
     1394                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
    11951395                        event.type = TRACE_EVENT_TERMINATE;
    11961396                        break;
     
    11981398
    11991399
    1200                 event.size = trace_get_capture_length(packet) + 
    1201                                 trace_get_framing_length(packet);
    1202                
     1400                event.size = trace_get_capture_length(packet) +
     1401                        trace_get_framing_length(packet);
     1402
    12031403                /* XXX trace_read_packet() normally applies the following
    12041404                 * config options for us, but this function is called via
     
    12061406
    12071407                if (libtrace->filter) {
    1208                         int filtret = trace_apply_filter(libtrace->filter, 
    1209                                         packet);
     1408                        int filtret = trace_apply_filter(libtrace->filter,
     1409                                                         packet);
    12101410                        if (filtret == -1) {
    12111411                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
    1212                                                 "Bad BPF Filter");
     1412                                              "Bad BPF Filter");
    12131413                                event.type = TRACE_EVENT_TERMINATE;
    12141414                                break;
     
    12211421                                 * a sleep event in this case, like we used to
    12221422                                 * do! */
    1223                                 libtrace->filtered_packets ++;
     1423                                libtrace->filtered_packets ++;
    12241424                                trace_clear_cache(packet);
    12251425                                continue;
    12261426                        }
    1227                                
     1427
    12281428                        event.type = TRACE_EVENT_PACKET;
    12291429                } else {
     
    12381438                        trace_set_capture_length(packet, libtrace->snaplen);
    12391439                }
    1240                 libtrace->accepted_packets ++;
     1440                libtrace->accepted_packets ++;
    12411441                break;
    1242         } while (1);
     1442        } while(1);
    12431443
    12441444        return event;
    12451445}
    12461446
    1247 /* Gets the number of dropped packets */
    1248 static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
    1249         if (trace->format_data == NULL)
    1250                 return (uint64_t)-1;
    1251         return DATA(trace)->drops;
     1447static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
     1448{
     1449        libtrace_list_node_t *tmp;
     1450        assert(stat && libtrace);
     1451        tmp = FORMAT_DATA_HEAD;
     1452
     1453        /* Dropped packets */
     1454        stat->dropped_valid = 1;
     1455        stat->dropped = 0;
     1456        while (tmp != NULL) {
     1457                stat->dropped += STREAM_DATA(tmp)->drops;
     1458                tmp = tmp->next;
     1459        }
     1460
     1461}
     1462
     1463static void dag_get_thread_statisitics(libtrace_t *libtrace, libtrace_thread_t *t,
     1464                                       libtrace_stat_t *stat) {
     1465        struct dag_per_stream_t *stream_data = t->format_data;
     1466        assert(stat && libtrace);
     1467
     1468        stat->dropped_valid = 1;
     1469        stat->dropped = stream_data->drops;
     1470
     1471        stat->filtered_valid = 1;
     1472        stat->filtered = 0;
    12521473}
    12531474
    12541475/* Prints some semi-useful help text about the DAG format module */
    12551476static void dag_help(void) {
    1256         printf("dag format module: $Revision: 1755 $\n");
    1257         printf("Supported input URIs:\n");
    1258         printf("\tdag:/dev/dagn\n");
    1259         printf("\n");
    1260         printf("\te.g.: dag:/dev/dag0\n");
    1261         printf("\n");
    1262         printf("Supported output URIs:\n");
    1263         printf("\tnone\n");
    1264         printf("\n");
     1477        printf("dag format module: $Revision: 1755 $\n");
     1478        printf("Supported input URIs:\n");
     1479        printf("\tdag:/dev/dagn\n");
     1480        printf("\n");
     1481        printf("\te.g.: dag:/dev/dag0\n");
     1482        printf("\n");
     1483        printf("Supported output URIs:\n");
     1484        printf("\tnone\n");
     1485        printf("\n");
     1486}
     1487
     1488static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
     1489                                bool reader)
     1490{
     1491        struct dag_per_stream_t *stream_data;
     1492        libtrace_list_node_t *node;
     1493
     1494        if (reader && t->type == THREAD_PERPKT) {
     1495                fprintf(stderr, "t%u: registered reader thread", t->perpkt_num);
     1496                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
     1497                                                t->perpkt_num);
     1498                if (node == NULL) {
     1499                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     1500                                      "Too few streams supplied for the"
     1501                                      " number of threads lanuched");
     1502                        return -1;
     1503                }
     1504                stream_data = node->data;
     1505
     1506                /* Pass the per thread data to the thread */
     1507                t->format_data = stream_data;
     1508
     1509                /* Attach and start the DAG stream */
     1510                printf("t%u: starting and attaching stream #%u\n",
     1511                       t->perpkt_num, stream_data->dagstream);
     1512                if (dag_start_input_stream(libtrace, stream_data) < 0)
     1513                        return -1;
     1514        }
     1515
     1516        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
     1517
     1518        return 0;
    12651519}
    12661520
    12671521static struct libtrace_format_t dag = {
    1268         "dag",
    1269         "$Id$",
    1270         TRACE_FORMAT_ERF,
     1522        "dag",
     1523        "$Id$",
     1524        TRACE_FORMAT_ERF,
    12711525        dag_probe_filename,             /* probe filename */
    12721526        NULL,                           /* probe magic */
    1273         dag_init_input,                 /* init_input */
    1274         dag_config_input,               /* config_input */
    1275         dag_start_input,                /* start_input */
    1276         dag_pause_input,                /* pause_input */
     1527        dag_init_input,                 /* init_input */
     1528        dag_config_input,               /* config_input */
     1529        dag_start_input,                /* start_input */
     1530        dag_pause_input,                /* pause_input */
    12771531        dag_init_output,                /* init_output */
    1278         NULL,                           /* config_output */
     1532        NULL,                           /* config_output */
    12791533        dag_start_output,               /* start_output */
    1280         dag_fin_input,                  /* fin_input */
     1534        dag_fin_input,                  /* fin_input */
    12811535        dag_fin_output,                 /* fin_output */
    1282         dag_read_packet,                /* read_packet */
    1283         dag_prepare_packet,             /* prepare_packet */
     1536        dag_read_packet,                /* read_packet */
     1537        dag_prepare_packet,             /* prepare_packet */
    12841538        NULL,                           /* fin_packet */
    12851539        dag_write_packet,               /* write_packet */
    1286         erf_get_link_type,              /* get_link_type */
    1287         erf_get_direction,              /* get_direction */
    1288         erf_set_direction,              /* set_direction */
    1289         erf_get_erf_timestamp,          /* get_erf_timestamp */
    1290         NULL,                           /* get_timeval */
    1291         NULL,                           /* get_seconds */
     1540        erf_get_link_type,              /* get_link_type */
     1541        erf_get_direction,              /* get_direction */
     1542        erf_set_direction,              /* set_direction */
     1543        erf_get_erf_timestamp,          /* get_erf_timestamp */
     1544        NULL,                           /* get_timeval */
     1545        NULL,                           /* get_seconds */
    12921546        NULL,                           /* get_timespec */
    1293         NULL,                           /* seek_erf */
    1294         NULL,                           /* seek_timeval */
    1295         NULL,                           /* seek_seconds */
    1296         erf_get_capture_length,         /* get_capture_length */
    1297         erf_get_wire_length,            /* get_wire_length */
    1298         erf_get_framing_length,         /* get_framing_length */
    1299         erf_set_capture_length,         /* set_capture_length */
     1547        NULL,                           /* seek_erf */
     1548        NULL,                           /* seek_timeval */
     1549        NULL,                           /* seek_seconds */
     1550        erf_get_capture_length,         /* get_capture_length */
     1551        erf_get_wire_length,            /* get_wire_length */
     1552        erf_get_framing_length,         /* get_framing_length */
     1553        erf_set_capture_length,         /* set_capture_length */
    13001554        NULL,                           /* get_received_packets */
    13011555        NULL,                           /* get_filtered_packets */
    1302         dag_get_dropped_packets,        /* get_dropped_packets */
    1303         NULL,                           /* get_captured_packets */
    1304         NULL,                           /* get_fd */
    1305         trace_event_dag,                /* trace_event */
    1306         dag_help,                       /* help */
    1307         NULL                            /* next pointer */
     1556        NULL,                           /* get_dropped_packets */
     1557        dag_get_statistics,             /* get_statistics */
     1558        NULL,                           /* get_fd */
     1559        trace_event_dag,                /* trace_event */
     1560        dag_help,                       /* help */
     1561        NULL,                            /* next pointer */
     1562        {true, 0}, /* live packet capture, thread limit TBD */
     1563        dag_pstart_input,
     1564        dag_pread_packets,
     1565        dag_pause_input,
     1566        NULL,
     1567        dag_pregister_thread,
     1568        NULL,
     1569        dag_get_thread_statisitics      /* get thread stats */
    13081570};
    13091571
    1310 void dag_constructor(void) {
     1572void dag_constructor(void)
     1573{
    13111574        register_format(&dag);
    13121575}
Note: See TracChangeset for help on using the changeset viewer.