Changeset cb39d35


Ignore:
Timestamp:
01/30/15 10:31:42 (6 years ago)
Author:
Dan Collins <dan@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
035f8a7
Parents:
fed9152
Message:

Moved DAG format to better support parallel and non-parallel API

Format data per stream is now stored in a linked list. This allows us to
add multiple per stream blocks for each additional stream, while still supporting
the original API. This greatly reduces code duplication and, to a minor extent,
RAM usage.

Location:
lib
Files:
2 added
3 edited

Legend:

Unmodified
Added
Removed
  • lib/Makefile.am

    r9e429e8 rcb39d35  
    2121
    2222if HAVE_LLVM
    23 BPFJITSOURCE=bpf-jit/bpf-jit.cc 
     23BPFJITSOURCE=bpf-jit/bpf-jit.cc
    2424else
    2525BPFJITSOURCE=
     
    5757                $(BPFJITSOURCE) \
    5858                libtrace_arphrd.h \
    59                 data-struct/ring_buffer.c data-struct/vector.c data-struct/message_queue.c \
    60                 data-struct/deque.c data-struct/sliding_window.c data-struct/object_cache.c \
    61                 hash_toeplitz.c combiner_ordered.c combiner_sorted.c combiner_unordered.c
     59                data-struct/ring_buffer.c data-struct/vector.c \
     60                data-struct/message_queue.c data-struct/deque.c \
     61                data-struct/sliding_window.c data-struct/object_cache.c \
     62                data-struct/linked_list.c hash_toeplitz.c combiner_ordered.c \
     63                combiner_sorted.c combiner_unordered.c
    6264
    6365if DAG2_4
  • lib/format_dag25.c

    r18bf317 rcb39d35  
    7979 */
    8080
    81 
    8281#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8382#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
    84 #define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
     83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
    8584
    8685#define FORMAT_DATA DATA(libtrace)
     
    8887
    8988#define DUCK FORMAT_DATA->duck
     89
     90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
     91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
     92
    9093static struct libtrace_format_t dag;
    9194
     
    117120};
    118121
    119 /* Data that is stored for each libtrace_thread_t */
    120 struct dag_per_thread_t {
     122/* Data that is stored against each input stream */
     123struct dag_per_stream_t {
    121124        /* DAG device */
    122125        struct dag_dev_t *device;
    123         /* Stream number */
    124         uint16_t stream;
     126        /* DAG stream number */
     127        uint16_t dagstream;
    125128        /* Pointer to the last unread byte in the DAG memory */
    126129        uint8_t *top;
     
    129132        /* Amount of data processed from the bottom pointer */
    130133        uint32_t processed;
    131         /* Number of packets seen by the thread */
     134        /* Number of packets seen by the stream */
    132135        uint64_t pkt_count;
    133         /* Drop count for this particular thread */
     136        /* Drop count for this particular stream */
    134137        uint64_t drops;
     138        /* Boolean values to indicate if a particular interface has been seen
     139         * or not. This is limited to four interfaces, which is enough to
     140         * support all current DAG cards */
     141        uint8_t seeninterface[4];
    135142};
    136143
     
    138145struct dag_format_data_t {
    139146        /* Data required for regular DUCK reporting */
     147        /* TODO: This doesn't work with the 10X2S card! I don't know how
     148         * DUCK stuff works and don't know how to fix it */
    140149        struct {
    141150                /* Timestamp of the last DUCK report */
     
    150159        } duck;
    151160
    152         /* The DAG device that we are reading from */
    153         struct dag_dev_t *device;
    154         /* The DAG stream that we are reading from */
    155         unsigned int dagstream;
    156         /* Boolean flag indicating whether the stream is currently attached */
     161        /* Boolean flag indicating whether the trace is currently attached */
    157162        int stream_attached;
    158         /* Pointer to the first unread byte in the DAG memory hole */
    159         uint8_t *bottom;
    160         /* Pointer to the last unread byte in the DAG memory hole */
    161         uint8_t *top;
    162         /* The amount of data processed thus far from the bottom pointer */
    163         uint32_t processed;
    164         /* The number of packets that have been dropped */
    165         uint64_t drops;
    166         /* When running in parallel mode this is malloc'd with an
    167          * array of thread structures. Most of the stuff above doesn't
    168          * get used in parallel mode. */
    169         struct dag_per_thread_t *per_thread;
    170 
    171         uint8_t seeninterface[4];
     163
     164        /* Data stored against each DAG input stream */
     165        libtrace_list_t *per_stream;
    172166};
    173167
     
    240234static void dag_init_format_data(libtrace_t *libtrace)
    241235{
     236        struct dag_per_stream_t stream_data;
     237
    242238        libtrace->format_data = (struct dag_format_data_t *)
    243239                malloc(sizeof(struct dag_format_data_t));
     
    246242        DUCK.last_pkt = 0;
    247243        DUCK.dummy_duck = NULL;
    248         FORMAT_DATA->stream_attached = 0;
    249         FORMAT_DATA->drops = 0;
    250         FORMAT_DATA->device = NULL;
    251         FORMAT_DATA->dagstream = 0;
    252         FORMAT_DATA->processed = 0;
    253         FORMAT_DATA->bottom = NULL;
    254         FORMAT_DATA->top = NULL;
    255         memset(FORMAT_DATA->seeninterface, 0,
    256                sizeof(FORMAT_DATA->seeninterface));
     244
     245        FORMAT_DATA->per_stream =
     246                libtrace_list_init(sizeof(stream_data));
     247        assert(FORMAT_DATA->per_stream != NULL);
     248
     249        /* We'll start with just one instance of stream_data, and we'll
     250         * add more later if we need them */
     251        memset(&stream_data, 0, sizeof(stream_data));
     252        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
    257253}
    258254
     
    480476        char *dag_dev_name = NULL;
    481477        char *scan = NULL;
    482         int stream = 0, thread_count = 1;
     478        int stream = 0;
    483479        struct dag_dev_t *dag_device = NULL;
    484480
     
    505501        }
    506502
    507         FORMAT_DATA->dagstream = stream;
     503        FORMAT_DATA_FIRST->dagstream = stream;
    508504
    509505        /* See if our DAG device is already open */
     
    528524        }
    529525
    530         FORMAT_DATA->device = dag_device;
     526        FORMAT_DATA_FIRST->device = dag_device;
    531527
    532528        /* See Config_Status_API_Programming_Guide.pdf from the Endace
     
    564560                /* Tell the card our new snap length */
    565561                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
    566                 if (dag_configure(FORMAT_DATA->device->fd,
     562                if (dag_configure(FORMAT_DATA_FIRST->device->fd,
    567563                                  conf_str) != 0) {
    568564                        trace_set_err(libtrace, errno, "Failed to configure "
     
    622618        struct timeval zero, nopoll;
    623619        uint8_t *top, *bottom, *starttop;
    624         uint64_t diff = 0;
    625620        top = bottom = NULL;
    626621
     
    630625
    631626        /* Attach and start the DAG stream */
    632         if (dag_attach_stream(FORMAT_DATA->device->fd,
    633                               FORMAT_DATA->dagstream, 0, 0) < 0) {
     627        if (dag_attach_stream(FORMAT_DATA_FIRST->device->fd,
     628                              FORMAT_DATA_FIRST->dagstream, 0, 0) < 0) {
    634629                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
    635630                return -1;
    636631        }
    637632
    638         if (dag_start_stream(FORMAT_DATA->device->fd,
    639                              FORMAT_DATA->dagstream) < 0) {
     633        if (dag_start_stream(FORMAT_DATA_FIRST->device->fd,
     634                             FORMAT_DATA_FIRST->dagstream) < 0) {
    640635                trace_set_err(libtrace, errno, "Cannot start DAG stream");
    641636                return -1;
     
    644639
    645640        /* We don't want the dag card to do any sleeping */
    646         dag_set_stream_poll(FORMAT_DATA->device->fd,
    647                             FORMAT_DATA->dagstream, 0, &zero,
     641        dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd,
     642                            FORMAT_DATA_FIRST->dagstream, 0, &zero,
    648643                            &nopoll);
    649644
    650         starttop = dag_advance_stream(FORMAT_DATA->device->fd,
    651                                       FORMAT_DATA->dagstream,
     645        starttop = dag_advance_stream(FORMAT_DATA_FIRST->device->fd,
     646                                      FORMAT_DATA_FIRST->dagstream,
    652647                                      &bottom);
    653648
     
    656651        while (starttop - bottom > 0) {
    657652                bottom += (starttop - bottom);
    658                 top = dag_advance_stream(FORMAT_DATA->device->fd,
    659                                          FORMAT_DATA->dagstream,
     653                top = dag_advance_stream(FORMAT_DATA_FIRST->device->fd,
     654                                         FORMAT_DATA_FIRST->dagstream,
    660655                                         &bottom);
    661656        }
    662         FORMAT_DATA->top = top;
    663         FORMAT_DATA->bottom = bottom;
    664         FORMAT_DATA->processed = 0;
    665         FORMAT_DATA->drops = 0;
     657        FORMAT_DATA_FIRST->top = top;
     658        FORMAT_DATA_FIRST->bottom = bottom;
     659        FORMAT_DATA_FIRST->processed = 0;
     660        FORMAT_DATA_FIRST->drops = 0;
    666661
    667662        return 0;
     663}
     664
     665static int dag_pstart_input(libtrace_t *libtrace)
     666{
     667        char *scan, *tok;
     668        uint16_t stream_count = 0, max_streams;
     669        int iserror = 0;
     670        struct dag_per_stream_t stream_data;
     671
     672        /* Check we aren't trying to create more threads than the DAG card can
     673         * handle */
     674        max_streams = dag_rx_get_stream_count(FORMAT_DATA_FIRST->device->fd);
     675        if (libtrace->perpkt_thread_count > max_streams) {
     676                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     677                              "trying to create too many threads (max is %u)",
     678                              max_streams);
     679                iserror = 1;
     680                goto cleanup;
     681        }
     682
     683        /* Get the stream names from the uri */
     684        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     685                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     686                              "format uri doesn't specify the DAG streams");
     687                iserror = 1;
     688                goto cleanup;
     689        }
     690
     691        scan++;
     692
     693        tok = strtok(scan, ",");
     694        while (tok != NULL) {
     695                /* Ensure we haven't specified too many streams */
     696                if (stream_count >= libtrace->perpkt_thread_count) {
     697                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     698                                      "format uri specifies too many streams. "
     699                                      "Max is %u", max_streams);
     700                        iserror = 1;
     701                        goto cleanup;
     702                }
     703
     704                /* Save the stream details */
     705                if (stream_count == 0) {
     706                        /* Special case where we update the existing stream
     707                         * data structure */
     708                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
     709                } else {
     710                        memset(&stream_data, 0, sizeof(stream_data));
     711                        stream_data.device = FORMAT_DATA_FIRST->device;
     712                        stream_data.dagstream = (uint16_t)atoi(tok);
     713                        libtrace_list_push_back(FORMAT_DATA->per_stream,
     714                                                &stream_data);
     715                }
     716
     717                stream_count++;
     718                tok = strtok(NULL, ",");
     719        }
     720
     721        FORMAT_DATA->stream_attached = 1;
     722
     723 cleanup:
     724        if (iserror) {
     725                return -1;
     726        } else {
     727                return 0;
     728        }
    668729}
    669730
     
    690751static int dag_pause_input(libtrace_t *libtrace)
    691752{
    692         /* Stop and detach the stream */
    693         if (dag_stop_stream(FORMAT_DATA->device->fd,
    694                             FORMAT_DATA->dagstream) < 0) {
    695                 trace_set_err(libtrace, errno, "Could not stop DAG stream");
    696                 return -1;
    697         }
    698         if (dag_detach_stream(FORMAT_DATA->device->fd,
    699                               FORMAT_DATA->dagstream) < 0) {
    700                 trace_set_err(libtrace, errno, "Could not detach DAG stream");
    701                 return -1;
    702         }
     753        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     754
     755        /* Stop and detach each stream */
     756        while (tmp != NULL) {
     757                if (dag_stop_stream(STREAM_DATA(tmp)->device->fd,
     758                                    STREAM_DATA(tmp)->dagstream) < 0) {
     759                        trace_set_err(libtrace, errno,
     760                                      "Could not stop DAG stream");
     761                        printf("Count not stop DAG stream\n");
     762                        return -1;
     763                }
     764                if (dag_detach_stream(STREAM_DATA(tmp)->device->fd,
     765                                      STREAM_DATA(tmp)->dagstream) < 0) {
     766                        trace_set_err(libtrace, errno,
     767                                      "Could not detach DAG stream");
     768                        printf("Count not detach DAG stream\n");
     769                        return -1;
     770                }
     771
     772                tmp = tmp->next;
     773        }
     774
    703775        FORMAT_DATA->stream_attached = 0;
    704776        return 0;
    705777}
    706778
     779
     780
    707781/* Closes a DAG input trace */
    708782static int dag_fin_input(libtrace_t *libtrace)
    709783{
     784        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     785
    710786        /* Need the lock, since we're going to be handling the device list */
    711787        pthread_mutex_lock(&open_dag_mutex);
     
    714790        if (FORMAT_DATA->stream_attached)
    715791                dag_pause_input(libtrace);
    716         FORMAT_DATA->device->ref_count --;
    717 
    718         /* Close the DAG device if there are no more references to it */
    719         if (FORMAT_DATA->device->ref_count == 0)
    720                 dag_close_device(FORMAT_DATA->device);
     792
     793        /* Close any dag devices that have no more references */
     794        while (tmp != NULL) {
     795                STREAM_DATA(tmp)->device->ref_count--;
     796                if (STREAM_DATA(tmp)->device->ref_count == 0)
     797                        dag_close_device(STREAM_DATA(tmp)->device);
     798
     799                tmp = tmp->next;
     800        }
     801
    721802        if (DUCK.dummy_duck)
    722803                trace_destroy_dead(DUCK.dummy_duck);
     804
     805        /* Clear the list */
     806        libtrace_list_deinit(FORMAT_DATA->per_stream);
     807
    723808        free(libtrace->format_data);
    724809        pthread_mutex_unlock(&open_dag_mutex);
     
    761846}
    762847
     848/* DUCK reporting is broken at the moment! */
     849#if 0
    763850/* Extracts DUCK information from the DAG card and produces a DUCK packet */
    764851static int dag_get_duckinfo(libtrace_t *libtrace, libtrace_packet_t *packet)
     
    797884        packet->trace = DUCK.dummy_duck;
    798885        return sizeof(duckinf_t);
    799 }
     886
     887        return 0;
     888}
     889#endif
    800890
    801891/* Determines the amount of data available to read from the DAG card */
    802 static int dag_available(libtrace_t *libtrace)
    803 {
    804         uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     892static int dag_available(libtrace_t *libtrace,
     893                         struct dag_per_stream_t *stream_data)
     894{
     895        uint32_t diff = stream_data->top - stream_data->bottom;
    805896
    806897        /* If we've processed more than 4MB of data since we last called
    807898         * dag_advance_stream, then we should call it again to allow the
    808899         * space occupied by that 4MB to be released */
    809         if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
     900        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
    810901                return diff;
    811902
    812903        /* Update the top and bottom pointers */
    813         FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
    814                                               FORMAT_DATA->dagstream,
    815                                               &(FORMAT_DATA->bottom));
    816 
    817         if (FORMAT_DATA->top == NULL) {
     904        stream_data->top = dag_advance_stream(stream_data->device->fd,
     905                                              stream_data->dagstream,
     906                                              &(stream_data->bottom));
     907
     908        if (stream_data->top == NULL) {
    818909                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
    819910                return -1;
    820911        }
    821         FORMAT_DATA->processed = 0;
    822         diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     912        stream_data->processed = 0;
     913        diff = stream_data->top - stream_data->bottom;
    823914        return diff;
    824915}
    825916
    826917/* Returns a pointer to the start of the next complete ERF record */
    827 static dag_record_t *dag_get_record(libtrace_t *libtrace)
     918static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
    828919{
    829920        dag_record_t *erfptr = NULL;
    830921        uint16_t size;
    831922
    832         erfptr = (dag_record_t *)FORMAT_DATA->bottom;
     923        erfptr = (dag_record_t *)stream_data->bottom;
    833924        if (!erfptr)
    834925                return NULL;
     
    838929
    839930        /* Make certain we have the full packet available */
    840         if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
     931        if (size > (stream_data->top - stream_data->bottom))
    841932                return NULL;
    842933
    843         FORMAT_DATA->bottom += size;
    844         FORMAT_DATA->processed += size;
     934        stream_data->bottom += size;
     935        stream_data->processed += size;
    845936        return erfptr;
    846937}
     
    848939/* Converts a buffer containing a recently read DAG packet record into a
    849940 * libtrace packet */
    850 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
    851                               void *buffer, libtrace_rt_types_t rt_type,
    852                               uint32_t flags)
     941static int dag_prepare_packet_real(libtrace_t *libtrace,
     942                                   struct dag_per_stream_t *stream_data,
     943                                   libtrace_packet_t *packet,
     944                                   void *buffer, libtrace_rt_types_t rt_type,
     945                                   uint32_t flags)
    853946{
    854947        dag_record_t *erfptr;
    855         libtrace_thread_t *t;
    856948
    857949        /* If the packet previously owned a buffer that is not the buffer
     
    859951         * old one to avoid memory leaks */
    860952        if (packet->buffer != buffer &&
    861                         packet->buf_control == TRACE_CTRL_PACKET) {
     953            packet->buf_control == TRACE_CTRL_PACKET) {
    862954                free(packet->buffer);
    863955        }
     
    896988        } else {
    897989                /* Use the ERF loss counter */
    898                 if (DATA(libtrace)->per_thread) {
    899                         t = get_thread_table(libtrace);
    900                         PERPKT_DATA(t)->drops += ntohs(erfptr->lctr);
     990                if (stream_data->seeninterface[erfptr->flags.iface]
     991                    == 0) {
     992                        stream_data->seeninterface[erfptr->flags.iface]
     993                                = 1;
    901994                } else {
    902                         if (FORMAT_DATA->seeninterface[erfptr->flags.iface]
    903                             == 0) {
    904                                 FORMAT_DATA->seeninterface[erfptr->flags.iface]
    905                                         = 1;
    906                         } else {
    907                                 FORMAT_DATA->drops += ntohs(erfptr->lctr);
    908                         }
    909                 }
    910         }
     995                        stream_data->drops += ntohs(erfptr->lctr);
     996                }
     997        }
     998
     999        packet->error = 1;
    9111000
    9121001        return 0;
     1002}
     1003
     1004static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
     1005                              void *buffer, libtrace_rt_types_t rt_type,
     1006                              uint32_t flags)
     1007{
     1008        return dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
     1009                                       buffer, rt_type, flags);
    9131010}
    9141011
     
    10901187 * If DUCK reporting is enabled, the packet returned may be a DUCK update
    10911188 */
    1092 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
    1093 {
    1094         int size = 0;
    1095         struct timeval tv;
     1189static int dag_read_packet_real(libtrace_t *libtrace,
     1190                                struct dag_per_stream_t *stream_data,
     1191                                libtrace_thread_t *t, /* Optional */
     1192                                libtrace_packet_t *packet)
     1193{
    10961194        dag_record_t *erfptr = NULL;
    10971195        int numbytes = 0;
    10981196        uint32_t flags = 0;
    1099         struct timeval maxwait;
    1100         struct timeval pollwait;
     1197        struct timeval maxwait, pollwait;
    11011198
    11021199        pollwait.tv_sec = 0;
     
    11051202        maxwait.tv_usec = 250000;
    11061203
    1107         /* Check if we're due for a DUCK report */
    1108         if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq &&
    1109             DUCK.duck_freq != 0) {
    1110                 size = dag_get_duckinfo(libtrace, packet);
    1111                 DUCK.last_duck = DUCK.last_pkt;
    1112                 if (size != 0) {
    1113                         return size;
    1114                 }
    1115                 /* No DUCK support, so don't waste our time anymore */
    1116                 DUCK.duck_freq = 0;
    1117         }
     1204        /* TODO: Support DUCK reporting */
    11181205
    11191206        /* Don't let anyone try to free our DAG memory hole! */
     
    11271214        }
    11281215
    1129         if (dag_set_stream_poll(FORMAT_DATA->device->fd, FORMAT_DATA->dagstream,
     1216        if (dag_set_stream_poll(stream_data->device->fd, stream_data->dagstream,
    11301217                                sizeof(dag_record_t), &maxwait,
    11311218                                &pollwait) == -1) {
     
    11341221        }
    11351222
    1136 
    11371223        /* Grab a full ERF record */
    11381224        do {
    1139                 numbytes = dag_available(libtrace);
     1225                numbytes = dag_available(libtrace, stream_data);
    11401226                if (numbytes < 0)
    11411227                        return numbytes;
    11421228                if (numbytes < dag_record_size) {
     1229                        /* Check the message queue if we have one to check */
     1230                        if (t != NULL &&
     1231                            libtrace_message_queue_count(&t->messages) > 0)
     1232                                return -2;
     1233
    11431234                        if (libtrace_halt)
    11441235                                return 0;
     
    11461237                        continue;
    11471238                }
    1148                 erfptr = dag_get_record(libtrace);
     1239                erfptr = dag_get_record(stream_data);
    11491240        } while (erfptr == NULL);
    11501241
    11511242        /* Prepare the libtrace packet */
    1152         if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
    1153                                flags))
    1154                 return -1;
    1155 
    1156         /* Update the DUCK timer */
    1157         tv = trace_get_timeval(packet);
    1158         DUCK.last_pkt = tv.tv_sec;
     1243        if (dag_prepare_packet_real(libtrace, stream_data, packet, erfptr,
     1244                                    TRACE_RT_DATA_ERF, flags))
     1245                return -1;
    11591246
    11601247        return packet->payload ? htons(erfptr->rlen) :
    11611248                erf_get_framing_length(packet);
     1249}
     1250
     1251static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1252{
     1253        return dag_read_packet_real(libtrace, FORMAT_DATA_FIRST, NULL, packet);
     1254}
     1255
     1256static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
     1257                             libtrace_packet_t **packets, size_t nb_packets)
     1258{
     1259        int ret;
     1260        size_t read_packets = 0;
     1261        int numbytes = 0;
     1262
     1263        struct dag_per_stream_t *stream_data =
     1264                (struct dag_per_stream_t *)t->format_data;
     1265
     1266        /* Read as many packets as we can, but read atleast one packet */
     1267        do {
     1268                ret = dag_read_packet_real(libtrace, stream_data, t,
     1269                                           packets[read_packets]);
     1270                if (ret < 0)
     1271                        return ret;
     1272
     1273                read_packets++;
     1274
     1275                /* Make sure we don't read too many packets..! */
     1276                if (read_packets >= nb_packets)
     1277                        break;
     1278
     1279                numbytes = dag_available(libtrace, stream_data);
     1280        } while (numbytes >= dag_record_size);
     1281
     1282        return read_packets;
    11621283}
    11631284
     
    11781299        minwait.tv_usec = 10000;
    11791300
    1180         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1181                                 FORMAT_DATA->dagstream, 0, &minwait,
     1301        if (dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd,
     1302                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
    11821303                                &minwait) == -1) {
    11831304                trace_set_err(libtrace, errno, "dag_set_stream_poll");
     
    11921313                /* Need to call dag_available so that the top pointer will get
    11931314                 * updated, otherwise we'll never see any data! */
    1194                 numbytes = dag_available(libtrace);
     1315                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
    11951316
    11961317                /* May as well not bother calling dag_get_record if
    11971318                 * dag_available suggests that there's no data */
    11981319                if (numbytes != 0)
    1199                         erfptr = dag_get_record(libtrace);
     1320                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
    12001321                if (erfptr == NULL) {
    12011322                        /* No packet available - sleep for a very short time */
     
    12081329                        break;
    12091330                }
    1210                 if (dag_prepare_packet(libtrace, packet, erfptr,
    1211                                       TRACE_RT_DATA_ERF, flags)) {
     1331                if (dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
     1332                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
    12121333                        event.type = TRACE_EVENT_TERMINATE;
    12131334                        break;
     
    12591380
    12601381/* Gets the number of dropped packets */
    1261 static uint64_t dag_get_dropped_packets(libtrace_t *trace)
     1382static uint64_t dag_get_dropped_packets(libtrace_t *libtrace)
    12621383{
    12631384        uint64_t sum = 0;
    1264         int i, tot;
    1265 
    1266         if (trace->format_data == NULL)
    1267                 return (uint64_t) - 1;
    1268 
    1269         if (DATA(trace)->per_thread) {
    1270                 tot = trace->perpkt_thread_count;
    1271 
    1272                 for (i = 0; i < tot; i++) {
    1273                         printf("t%d: drops %" PRIu64 "\n",
    1274                                DATA(trace)->per_thread[i].drops);
    1275                         sum += DATA(trace)->per_thread[i].drops;
    1276                 }
    1277         }
    1278 
    1279         sum += DATA(trace)->drops;
     1385        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     1386
     1387        /* Sum the drop counter for all the packets */
     1388        while (tmp != NULL) {
     1389                sum += STREAM_DATA(tmp)->drops;
     1390                tmp = tmp->next;
     1391        }
    12801392
    12811393        return sum;
     
    12951407}
    12961408
    1297 static int dag_pstart_input(libtrace_t *libtrace)
    1298 {
    1299         char *scan, *tok;
    1300         uint16_t stream_count = 0, max_streams;
    1301         /* We keep our own pointer to per_thread as the system will free
    1302          * up FORMAT_DATA without freeing this if something goes wrong */
    1303         struct dag_per_thread_t *per_thread = NULL;
    1304         int iserror = 0;
    1305 
    1306         /* Check we aren't trying to create more threads than the DAG card can
    1307          * handle */
    1308         max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
    1309         if (libtrace->perpkt_thread_count > max_streams) {
    1310                 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
    1311                               "trying to create too many threads (max is %u)",
    1312                               max_streams);
    1313                 iserror = 1;
    1314                 goto cleanup;
    1315         }
    1316 
    1317         /* Create the thread structures */
    1318         per_thread = calloc(libtrace->perpkt_thread_count,
    1319                             sizeof(struct dag_per_thread_t));
    1320         FORMAT_DATA->per_thread = per_thread;
    1321 
    1322         /* Get the stream names from the uri */
    1323         if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
    1324                 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
    1325                               "format uri doesn't specify the DAG streams");
    1326                 iserror = 1;
    1327                 goto cleanup;
    1328         }
    1329 
    1330         scan++;
    1331 
    1332         tok = strtok(scan, ",");
    1333         while (tok != NULL) {
    1334                 /* Ensure we haven't specified too many streams */
    1335                 if (stream_count >= libtrace->perpkt_thread_count) {
    1336                         trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
    1337                                       "format uri specifies too many streams. "
    1338                                       "Max is %u", max_streams);
    1339                         iserror = 1;
    1340                         goto cleanup;
    1341                 }
    1342 
    1343                 /* Save the stream details */
    1344                 per_thread[stream_count].device = FORMAT_DATA->device;
    1345                 per_thread[stream_count++].stream = (uint16_t)atoi(tok);
    1346 
    1347                 tok = strtok(NULL, ",");
    1348         }
    1349 
    1350  cleanup:
    1351         if (iserror) {
    1352                 /* Free the per_thread memory */
    1353                 free(per_thread);
    1354                 return -1;
    1355         } else {
    1356                 return 0;
    1357         }
    1358 }
    1359 
    1360 
    1361 
    1362 /* TODO: Fold this into dag_available */
    1363 static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t)
    1364 {
    1365         uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
    1366 
    1367         /* If we've processed more than 4MB of data since we last called
    1368          * dag_advance_stream, then we should call it again to allow the
    1369          * space occupied by that 4MB to be released */
    1370         if (diff >= dag_record_size && PERPKT_DATA(t)->processed <
    1371             4*1024*1024)
    1372                 return diff;
    1373 
    1374         /* Update top and bottom pointers */
    1375         PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
    1376                                                  PERPKT_DATA(t)->stream,
    1377                                                  &(PERPKT_DATA(t)->bottom));
    1378 
    1379         if (PERPKT_DATA(t)->top == NULL) {
    1380                 trace_set_err(libtrace, errno, "dag_advance_stream failed!");
    1381                 return -1;
    1382         }
    1383 
    1384         PERPKT_DATA(t)->processed = 0;
    1385         diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
    1386         return diff;
    1387 }
    1388 
    1389 /* TODO: Fold this into dag_get_record */
    1390 static dag_record_t *dag_pget_record(libtrace_t *libtrace,
    1391                                      libtrace_thread_t *t)
    1392 {
    1393         dag_record_t *erfptr = NULL;
    1394         uint16_t size;
    1395 
    1396         erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
    1397         if (!erfptr)
    1398                 return NULL;
    1399 
    1400         /* Ensure we have a whole record */
    1401         size = ntohs(erfptr->rlen);
    1402         assert(size >= dag_record_size);
    1403         if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
    1404                 return NULL;
    1405 
    1406         /* Advance the buffer pointers */
    1407         PERPKT_DATA(t)->bottom += size;
    1408         PERPKT_DATA(t)->processed += size;
    1409 
    1410         return erfptr;
    1411 }
    1412 
    1413 static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
    1414                             libtrace_packet_t *packet)
    1415 {
    1416         dag_record_t *erfptr = NULL;
    1417         int numbytes = 0;
    1418         uint32_t flags = 0;
    1419         struct timeval maxwait, pollwait;
    1420 
    1421         pollwait.tv_sec = 0;
    1422         pollwait.tv_usec = 10000;
    1423         maxwait.tv_sec = 0;
    1424         maxwait.tv_usec = 250000;
    1425 
    1426         /* TODO: Support DUCK reporting */
    1427 
    1428         /* Don't let anyone try to free our DAG memory hole! */
    1429         flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
    1430 
    1431         /* If the packet buffer is currently owned by libtrace, free it so
    1432          * that we can set the packet to point into the DAG memory hole */
    1433         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1434                 free(packet->buffer);
    1435                 packet->buffer = 0;
    1436         }
    1437 
    1438         /* Configure DAG card stream polling */
    1439         if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
    1440                                 PERPKT_DATA(t)->stream, sizeof(dag_record_t),
    1441                                 &maxwait, &pollwait) < 0) {
    1442                 trace_set_err(libtrace, errno, "dag_set_stream_poll");
    1443                 return -1;
    1444         }
    1445 
    1446         /* Grab an ERF record */
    1447         do {
    1448                 numbytes = dag_pavailable(libtrace, t);
    1449                 if (numbytes < 0)
    1450                         return numbytes;
    1451                 if (numbytes < dag_record_size) {
    1452                         if (libtrace_halt)
    1453                                 return 0;
    1454 
    1455                         /* Check message queue to see if we should
    1456                          * abort early */
    1457                         if (libtrace_message_queue_count(&t->messages) > 0)
    1458                                 return -2;
    1459 
    1460                         /* Keep trying until we see a packet */
    1461                         continue;
    1462                 }
    1463 
    1464                 erfptr = dag_pget_record(libtrace, t);
    1465         } while (erfptr == NULL);
    1466 
    1467         /* Prepare the libtrace packet */
    1468         if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
    1469                                flags))
    1470                 return -1;
    1471 
    1472         PERPKT_DATA(t)->pkt_count++;
    1473 
    1474         return packet->payload ? htons(erfptr->rlen) :
    1475                 erf_get_framing_length(packet);
    1476 }
    1477 
    1478 static int dag_ppause_input(libtrace_t *libtrace)
    1479 {
    1480         int i, tot = libtrace->perpkt_thread_count;
    1481         struct dag_per_thread_t *t_data;
    1482 
    1483         /* Stop and detach all the streams */
    1484         printf("Stopping and detaching all streams\n");
    1485         for (i = 0; i < tot; i++) {
    1486                 t_data = &FORMAT_DATA->per_thread[i];
    1487 
    1488                 if (dag_stop_stream(t_data->device->fd,
    1489                                     t_data->stream) < 0) {
    1490                         trace_set_err(libtrace, errno,
    1491                                       "can't stop DAG stream #%u",
    1492                                       t_data->stream);
    1493                         return -1;
    1494                 }
    1495 
    1496                 if (dag_detach_stream(t_data->device->fd,
    1497                                       t_data->stream) < 0) {
    1498                         trace_set_err(libtrace, errno,
    1499                                       "can't detach DAG stream #%u",
    1500                                       t_data->stream);
    1501                         return -1;
    1502                 }
    1503         }
    1504 
    1505         /* Free up the per_thread array */
    1506         free(FORMAT_DATA->per_thread);
    1507         FORMAT_DATA->per_thread = NULL;
    1508 
    1509         return 0;
    1510 }
    1511 
    1512 static int dag_pconfig_input(libtrace_t *libtrace,
    1513                              trace_parallel_option_t option, void *value)
     1409static int dag_pconfig_input(UNUSED libtrace_t *libtrace,
     1410                             trace_parallel_option_t option, UNUSED void *value)
    15141411{
    15151412        /* We don't support any of these! Normally you configure the DAG card
     
    15301427}
    15311428
     1429/* TODO: Should possibly make a more generic dag_start_input, as there's a
     1430 * fair bit of code duplication between that and this */
    15321431static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
    15331432                                bool reader)
    15341433{
    1535         /* XXX: This function gets run sequentially for all
    1536          * threads. Should investigate making it parallel as draining the
    1537          * memory could be needlessly time consuming.
    1538          */
    1539         uint8_t *top, *bottom;
    1540         /* XXX: Investigate this type, as I would assume the value
    1541          * could be larger than 255 */
    1542         uint8_t diff = 0;
    15431434        struct timeval zero, nopoll;
    1544 
     1435        uint8_t *top, *bottom, *starttop;
     1436        struct dag_per_stream_t *stream_data;
    15451437        top = bottom = NULL;
    15461438
     
    15521444        if (reader) {
    15531445                if (t->type == THREAD_PERPKT) {
     1446                        stream_data =
     1447                                (struct dag_per_stream_t *)
     1448                                libtrace_list_get_index(FORMAT_DATA->per_stream,
     1449                                                        t->perpkt_num)->data;
     1450
    15541451                        /* Pass the per thread data to the thread */
    1555                         t->format_data =
    1556                                 &FORMAT_DATA->per_thread[t->perpkt_num];
     1452                        t->format_data = stream_data;
    15571453
    15581454                        /* Attach and start the DAG stream */
    15591455                        printf("t%u: starting and attaching stream #%u\n",
    1560                                t->perpkt_num, PERPKT_DATA(t)->stream);
    1561                         if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
    1562                                               PERPKT_DATA(t)->stream, 0,
     1456                               t->perpkt_num, stream_data->dagstream);
     1457                        if (dag_attach_stream(stream_data->device->fd,
     1458                                              stream_data->dagstream, 0,
    15631459                                              0) < 0) {
     1460                                printf("can't attach DAG stream #%u\n",
     1461                                       stream_data->dagstream);
    15641462                                trace_set_err(libtrace, errno,
    15651463                                              "can't attach DAG stream #%u",
    1566                                               PERPKT_DATA(t)->stream);
     1464                                              stream_data->dagstream);
    15671465                                return -1;
    15681466                        }
    1569                         if (dag_start_stream(PERPKT_DATA(t)->device->fd,
    1570                                              PERPKT_DATA(t)->stream) < 0) {
     1467                        if (dag_start_stream(stream_data->device->fd,
     1468                                             stream_data->dagstream) < 0) {
    15711469                                trace_set_err(libtrace, errno,
    15721470                                              "can't start DAG stream #%u",
    1573                                               PERPKT_DATA(t)->stream);
     1471                                              stream_data->dagstream);
     1472                                printf("can't start DAG stream #%u\n",
     1473                                       stream_data->dagstream);
    15741474                                return -1;
    15751475                        }
    15761476
    1577                         /* Ensure that dag_advance_stream will return without blocking */
    1578                         if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
    1579                                                PERPKT_DATA(t)->stream, 0, &zero,
     1477                        /* Ensure that dag_advance_stream will return without
     1478                         * blocking */
     1479                        if(dag_set_stream_poll(stream_data->device->fd,
     1480                                               stream_data->dagstream, 0, &zero,
    15801481                                               &nopoll) < 0) {
    15811482                                trace_set_err(libtrace, errno,
     
    15851486
    15861487                        /* Clear all the data from the memory hole */
    1587                         do {
    1588                                 top = dag_advance_stream(PERPKT_DATA(t)->
     1488                        starttop = dag_advance_stream(stream_data->
     1489                                                      device->fd,
     1490                                                      stream_data->dagstream,
     1491                                                      &bottom);
     1492
     1493                        top = starttop;
     1494                        while (starttop - bottom > 0) {
     1495                                bottom += (starttop - bottom);
     1496                                top = dag_advance_stream(stream_data->
    15891497                                                         device->fd,
    1590                                                          PERPKT_DATA(t)->stream,
     1498                                                         stream_data->dagstream,
    15911499                                                         &bottom);
    1592 
    1593                                 assert(top && bottom);
    1594                                 diff = top - bottom;
    1595                                 bottom -= diff;
    1596                         } while (diff != 0);
    1597 
    1598                         PERPKT_DATA(t)->top = NULL;
    1599                         PERPKT_DATA(t)->bottom = NULL;
    1600                         PERPKT_DATA(t)->pkt_count = 0;
    1601                         PERPKT_DATA(t)->drops = 0;
     1500                        }
     1501                        stream_data->top = top;
     1502                        stream_data->bottom = bottom;
     1503                        stream_data->pkt_count = 0;
     1504                        stream_data->drops = 0;
    16021505                } else {
    1603                         /* TODO: Figure out why we need this */
    1604                         t->format_data = &FORMAT_DATA->per_thread[0];
     1506                        /* TODO: Figure out why t->type != THREAD_PERPKT in
     1507                         * order to figure out what this line does */
     1508                        t->format_data = FORMAT_DATA_FIRST;
    16051509                }
    16061510        }
     
    16521556        dag_help,                       /* help */
    16531557        NULL,                            /* next pointer */
    1654                 {true, 0}, /* live packet capture, thread limit TBD */
    1655                 dag_pstart_input,
    1656                 dag_pread_packet,
    1657                 dag_ppause_input,
    1658                 NULL,
    1659                 dag_pconfig_input,
    1660                 dag_pregister_thread,
    1661                 NULL
     1558        {true, 0}, /* live packet capture, thread limit TBD */
     1559        dag_pstart_input,
     1560        dag_pread_packets,
     1561        dag_pause_input,
     1562        NULL,
     1563        dag_pconfig_input,
     1564        dag_pregister_thread,
     1565        NULL
    16621566};
    16631567
  • lib/libtrace_int.h

    r04bf7c5 rcb39d35  
    153153#include "data-struct/message_queue.h"
    154154#include "data-struct/deque.h"
     155#include "data-struct/linked_list.h"
    155156#include "data-struct/sliding_window.h"
    156157
Note: See TracChangeset for help on using the changeset viewer.