Changeset 21f5f0f


Ignore:
Timestamp:
11/21/14 15:29:15 (7 years ago)
Author:
Dan Collins <dan@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
6e41e73
Parents:
59e8400 (diff), c66a465 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'dag_parallel' into develop

Location:
lib
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/data-struct/object_cache.c

    r957a72a r78911b5  
    226226        // Make sure we haven't lost too many packets
    227227        if (oc->current_allocations)
    228                 fprintf(stderr, "!!OCache closing lost, %d packets!!", (int) oc->current_allocations);
     228                fprintf(stderr, "!!OCache closing lost, %d packets!!\n", (int) oc->current_allocations);
    229229        else
    230                 fprintf(stderr, "!!OCache closing lost, %d packets!!", (int) oc->current_allocations);
     230                /* This is clearly a bug, but I don't know what to replace it with... */
     231                fprintf(stderr, "!!OCache closing lost, %d packets!!\n", (int) oc->current_allocations);
    231232        libtrace_ringbuffer_destroy(&oc->rb);
    232233        pthread_spin_destroy(&oc->spin);
  • lib/format_dag25.c

    r11cf9b1 rc66a465  
    8282#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8383#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
     84#define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
    8485
    8586#define FORMAT_DATA DATA(libtrace)
     
    114115        /* A buffer to hold the data to be transmittted */
    115116        uint8_t *txbuffer;
     117};
     118
     119/* Data that is stored for each libtrace_thread_t */
     120struct dag_per_thread_t {
     121        struct dag_dev_t *device; /* DAG device */
     122        uint16_t stream; /* Stream number */
     123        uint8_t *top; /* Pointer to the last unread byte in the DAG memory */
     124        uint8_t *bottom; /* Pointer to the first unread byte in the DAG memory */
     125        uint32_t processed; /* Amount of data processed from the bottom pointer */
     126        uint64_t pkt_count; /* Number of packets seen by the thread */
     127        uint64_t drops;
    116128};
    117129
     
    146158        /* The number of packets that have been dropped */
    147159        uint64_t drops;
     160        /* When running in parallel mode this is malloc'd with an array of thread
     161         * structures. Most of the stuff above doesn't get used in parallel mode. */
     162        struct dag_per_thread_t *per_thread;
    148163};
    149164
     
    450465        char *dag_dev_name = NULL;
    451466        char *scan = NULL;
    452         int stream = 0;
     467        int stream = 0, thread_count = 1;
    453468        struct dag_dev_t *dag_device = NULL;
    454469
     
    457472         * list */
    458473        pthread_mutex_lock(&open_dag_mutex);
    459        
    460        
    461         /* Specific streams are signified using a comma in the libtrace URI,
     474
     475
     476        /* DAG cards support multiple streams. In a single threaded capture,
     477         * these are specified using a comma in the libtrace URI,
    462478         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
    463479         *
    464          * If no stream is specified, we will read from stream 0 */
    465         if ((scan = strchr(libtrace->uridata,',')) == NULL) {
     480         * If no stream is specified, we will read from stream 0 with one thread
     481         */
     482        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
    466483                dag_dev_name = strdup(libtrace->uridata);
     484               
    467485        } else {
    468486                dag_dev_name = (char *)strndup(libtrace->uridata,
    469                                 (size_t)(scan - libtrace->uridata));
     487                                                                           (size_t)(scan - libtrace->uridata));
    470488                stream = atoi(++scan);
    471489        }
     
    800818
    801819        dag_record_t *erfptr;
     820        libtrace_thread_t *t;
    802821       
    803822        /* If the packet previously owned a buffer that is not the buffer
     
    836855
    837856        /* Update the dropped packets counter */
    838 
    839857        /* No loss counter for DSM coloured records - have to use
    840858         * some other API */
     859        /* Adding multithread support for this isn't actually that useful for the
     860         * DAG7.5G2, as there's no way to use multiple receive streams without DSM */
    841861        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
    842862                /* TODO */
    843863        } else {
    844864                /* Use the ERF loss counter */
    845                 DATA(libtrace)->drops += ntohs(erfptr->lctr);
    846         }
     865                if (DATA(libtrace)->per_thread) {
     866                        t = get_thread_table(libtrace);
     867                        PERPKT_DATA(t)->drops += ntohs(erfptr->lctr);
     868                } else {
     869                        printf("DROP!\n");
     870                        DATA(libtrace)->drops += ntohs(erfptr->lctr);
     871                }
     872        }
     873
    847874
    848875        return 0;
     
    11991226/* Gets the number of dropped packets */
    12001227static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
     1228        uint64_t sum = 0;
     1229        int i, tot;
     1230
    12011231        if (trace->format_data == NULL)
    12021232                return (uint64_t)-1;
    1203         return DATA(trace)->drops;
     1233
     1234        if (DATA(trace)->per_thread) {
     1235                tot = trace->perpkt_thread_count;
     1236
     1237                for (i = 0; i < tot; i++) {
     1238                        printf("t%d: drops %" PRIu64 "\n",
     1239                                   DATA(trace)->per_thread[i].drops);
     1240                        sum += DATA(trace)->per_thread[i].drops;
     1241                }
     1242        }
     1243
     1244        sum += DATA(trace)->drops;
     1245
     1246        return sum;
    12041247}
    12051248
     
    12151258        printf("\tnone\n");
    12161259        printf("\n");
     1260}
     1261
     1262static int dag_pstart_input(libtrace_t *libtrace) {
     1263        char *scan, *tok;
     1264        uint16_t stream_count = 0, max_streams;
     1265        /* We keep our own pointer to per_thread as the system will free
     1266         * up FORMAT_DATA without freeing this if something goes wrong */
     1267        struct dag_per_thread_t *per_thread = NULL;
     1268        int iserror = 0;
     1269
     1270        /* Check we aren't trying to create more threads than the DAG card can
     1271         * handle */
     1272        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
     1273        if (libtrace->perpkt_thread_count > max_streams) {
     1274                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "trying to create too "
     1275                                          "many threads (max is %u)", max_streams);
     1276                iserror = 1;
     1277                goto cleanup;
     1278        }
     1279
     1280        /* Create the thread structures */
     1281        per_thread = calloc(libtrace->perpkt_thread_count,
     1282                                                                         sizeof(struct dag_per_thread_t));
     1283        FORMAT_DATA->per_thread = per_thread;
     1284
     1285        /* Get the stream names from the uri */
     1286        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     1287                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri doesn't "
     1288                                          "specify the DAG streams");
     1289                iserror = 1;
     1290                goto cleanup;
     1291        }
     1292
     1293        scan++;
     1294 
     1295        tok = strtok(scan, ",");
     1296        while (tok != NULL) {
     1297                /* Ensure we haven't specified too many streams */
     1298                if (stream_count >= libtrace->perpkt_thread_count) {
     1299                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri specifies too "
     1300                                          "many streams. Max is %u", max_streams);
     1301                        iserror = 1;
     1302                        goto cleanup;
     1303                }
     1304
     1305                /* Save the stream details */
     1306                per_thread[stream_count].device = FORMAT_DATA->device;
     1307                per_thread[stream_count++].stream = (uint16_t)atoi(tok);
     1308
     1309                tok = strtok(NULL, ",");
     1310        }
     1311
     1312 cleanup:
     1313        if (iserror) {
     1314                /* Free the per_thread memory */
     1315                free(per_thread);
     1316               
     1317                return -1;
     1318        } else {
     1319                return 0;
     1320        }
     1321}
     1322
     1323
     1324
     1325/* TODO: Fold this into dag_available */
     1326static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t) {
     1327        uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
     1328
     1329        /* If we've processed more than 4MB of data since we last called
     1330         * dag_advance_stream, then we should call it again to allow the
     1331         * space occupied by that 4MB to be released */
     1332        if (diff >= dag_record_size && PERPKT_DATA(t)->processed < 4 * 1024 * 1024)
     1333                return diff;
     1334
     1335        /* Update top and bottom pointers */
     1336        PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
     1337                                                                                         PERPKT_DATA(t)->stream,
     1338                                                                                         &(PERPKT_DATA(t)->bottom));
     1339
     1340        if (PERPKT_DATA(t)->top == NULL) {
     1341                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
     1342                return -1;
     1343        }
     1344
     1345        PERPKT_DATA(t)->processed = 0;
     1346        diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
     1347        return diff;
     1348}
     1349
     1350/* TODO: Fold this into dag_get_record */
     1351static dag_record_t *dag_pget_record(libtrace_t *libtrace,
     1352                                                                         libtrace_thread_t *t) {
     1353        dag_record_t *erfptr = NULL;
     1354        uint16_t size;
     1355
     1356        erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
     1357        if (!erfptr)
     1358                return NULL;
     1359
     1360        /* Ensure we have a whole record */
     1361        size = ntohs(erfptr->rlen);
     1362        assert(size >= dag_record_size);
     1363        if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
     1364                return NULL;
     1365
     1366        /* Advance the buffer pointers */
     1367        PERPKT_DATA(t)->bottom += size;
     1368        PERPKT_DATA(t)->processed += size;
     1369       
     1370        return erfptr;
     1371}
     1372
     1373static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
     1374                                                        libtrace_packet_t *packet) {
     1375        dag_record_t *erfptr = NULL;
     1376        int numbytes = 0;
     1377        uint32_t flags = 0;
     1378        struct timeval maxwait, pollwait;
     1379
     1380        pollwait.tv_sec = 0;
     1381        pollwait.tv_usec = 10000;
     1382        maxwait.tv_sec = 0;
     1383        maxwait.tv_usec = 250000;
     1384
     1385        /* TODO: Support DUCK reporting */
     1386
     1387        /* Don't let anyone try to free our DAG memory hole! */
     1388        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
     1389
     1390        /* If the packet buffer is currently owned by libtrace, free it so
     1391         * that we can set the packet to point into the DAG memory hole */
     1392        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1393                free(packet->buffer);
     1394                packet->buffer = 0;
     1395        }
     1396
     1397        /* Configure DAG card stream polling */
     1398        if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd, PERPKT_DATA(t)->stream,
     1399                                                        sizeof(dag_record_t), &maxwait, &pollwait) < 0) {
     1400                trace_set_err(libtrace, errno, "dag_set_stream_poll");
     1401                return -1;
     1402        }
     1403
     1404        /* Grab an ERF record */
     1405        do {
     1406                numbytes = dag_pavailable(libtrace, t);
     1407                if (numbytes < 0)
     1408                        return numbytes;
     1409                if (numbytes < dag_record_size) {
     1410                        if (libtrace_halt)
     1411                                return 0;
     1412
     1413                        /* Check message queue to see if we should abort early */
     1414                        if (libtrace_message_queue_count(&t->messages) > 0)
     1415                                return -2;
     1416
     1417                        /* Keep trying until we see a packet */
     1418                        continue;
     1419                }
     1420
     1421                erfptr = dag_pget_record(libtrace, t);
     1422        } while (erfptr == NULL);
     1423
     1424        /* Prepare the libtrace packet */
     1425        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
     1426                                                   flags))
     1427                return -1;
     1428
     1429        PERPKT_DATA(t)->pkt_count++;
     1430
     1431        return packet->payload ? htons(erfptr->rlen) :
     1432                erf_get_framing_length(packet);
     1433}
     1434
     1435static int dag_ppause_input(libtrace_t *libtrace) {
     1436        int i, tot = libtrace->perpkt_thread_count;
     1437        struct dag_per_thread_t *t_data;
     1438
     1439        /* Stop and detach all the streams */
     1440        printf("Stopping and detaching all streams\n");
     1441        for (i = 0; i < tot; i++) {
     1442                t_data = &FORMAT_DATA->per_thread[i];
     1443
     1444                if (dag_stop_stream(t_data->device->fd,
     1445                                                        t_data->stream) < 0) {
     1446                        trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
     1447                                                  t_data->stream);
     1448                        return -1;
     1449                }
     1450
     1451                if (dag_detach_stream(t_data->device->fd,
     1452                                                          t_data->stream) < 0) {
     1453                        trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
     1454                                                  t_data->stream);
     1455                        return -1;
     1456                }
     1457        }
     1458
     1459        /* Free up the per_thread array */
     1460        free(FORMAT_DATA->per_thread);
     1461        FORMAT_DATA->per_thread = NULL;
     1462
     1463        return 0;
     1464}
     1465
     1466static int dag_pconfig_input(libtrace_t *libtrace,
     1467                                                         trace_parallel_option_t option, void *value) {
     1468
     1469        /* We don't support any of these! Normally you configure the DAG card
     1470         * externally. */
     1471        switch(option) {
     1472        case TRACE_OPTION_SET_HASHER:
     1473        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
     1474        case TRACE_OPTION_TRACETIME:
     1475        case TRACE_OPTION_TICK_INTERVAL:
     1476        case TRACE_OPTION_GET_CONFIG:
     1477        case TRACE_OPTION_SET_CONFIG:
     1478                return -1;
     1479        }
     1480        /* We don't provide a default option to ensure that future options will
     1481         * generate a compiler warning. */
     1482
     1483        return -1;
     1484}
     1485
     1486static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
     1487                                                                bool reader) {
     1488        /* XXX: This function gets run sequentially for all
     1489         * threads. Should investigate making it parallel as draining the
     1490         * memory could be needlessly time consuming.
     1491         */
     1492        uint8_t *top, *bottom;
     1493        uint8_t diff = 0; /* XXX: Investigate this type, as I would assume the value
     1494                                           * could be larger than 255 */
     1495        struct timeval zero, nopoll;
     1496
     1497        top = bottom = NULL;
     1498
     1499        /* Minimum delay is 10mS */
     1500        zero.tv_sec = 0;
     1501        zero.tv_usec = 10000;
     1502        nopoll = zero;
     1503
     1504        if (reader) {
     1505                if (t->type == THREAD_PERPKT) {
     1506                        /* Pass the per thread data to the thread */
     1507                        t->format_data = &FORMAT_DATA->per_thread[t->perpkt_num];
     1508
     1509                        /* Attach and start the DAG stream */
     1510                        printf("t%u: starting and attaching stream #%u\n", t->perpkt_num,
     1511                                   PERPKT_DATA(t)->stream);
     1512                        if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
     1513                                                                  PERPKT_DATA(t)->stream, 0, 0) < 0) {
     1514                                trace_set_err(libtrace, errno, "can't attach DAG stream #%u",
     1515                                                          PERPKT_DATA(t)->stream);
     1516                                return -1;
     1517                        }
     1518                        if (dag_start_stream(PERPKT_DATA(t)->device->fd,
     1519                                                                 PERPKT_DATA(t)->stream) < 0) {
     1520                                trace_set_err(libtrace, errno, "can't start DAG stream #%u",
     1521                                                          PERPKT_DATA(t)->stream);
     1522                                return -1;
     1523                        }
     1524
     1525                        /* Ensure that dag_advance_stream will return without blocking */
     1526                        if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
     1527                                                                   PERPKT_DATA(t)->stream, 0, &zero,
     1528                                                                   &nopoll) < 0) {
     1529                                trace_set_err(libtrace, errno, "dag_set_stream_poll failed!");
     1530                                return -1;
     1531                        }
     1532
     1533                        /* Clear all the data from the memory hole */
     1534                        do {
     1535                                top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
     1536                                                                                 PERPKT_DATA(t)->stream,
     1537                                                                                 &bottom);
     1538
     1539                                assert(top && bottom);
     1540                                diff = top - bottom;
     1541                                bottom -= diff;
     1542                        } while (diff != 0);
     1543
     1544                        PERPKT_DATA(t)->top = NULL;
     1545                        PERPKT_DATA(t)->bottom = NULL;
     1546                        PERPKT_DATA(t)->pkt_count = 0;
     1547                        PERPKT_DATA(t)->drops = 0;
     1548                } else {
     1549                        /* TODO: Figure out why we need this */
     1550                        t->format_data = &FORMAT_DATA->per_thread[0];
     1551                }
     1552        }
     1553
     1554        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
     1555
     1556        return 0;
    12171557}
    12181558
     
    12581598        dag_help,                       /* help */
    12591599        NULL,                            /* next pointer */
    1260         NON_PARALLEL(true)
     1600                {true, 0}, /* live packet capture, thread limit TBD */
     1601                dag_pstart_input,
     1602                dag_pread_packet,
     1603                dag_ppause_input,
     1604                NULL,
     1605                dag_pconfig_input,
     1606                dag_pregister_thread,
     1607                NULL
    12611608};
    12621609
  • lib/trace_parallel.c

    rf9ba82f r78911b5  
    124124
    125125static void print_memory_stats() {
     126#if 0
    126127        char t_name[50];
    127128        uint64_t total;
     
    165166                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
    166167        }
    167 
     168#endif
    168169}
    169170
     
    13371338        assert(packet);
    13381339
    1339         if (libtrace->format->read_packet) {
     1340        if (libtrace->format->pread_packet) {
    13401341                do {
    13411342                        size_t ret;
     
    21982199        assert(str);
    21992200        assert(uc);
    2200         printf ("Splitting string \"%s\" into tokens:\n",str);
    22012201        pch = strtok (str," ,.-");
    22022202        while (pch != NULL)
Note: See TracChangeset for help on using the changeset viewer.