Changeset 9149564 for lib


Ignore:
Timestamp:
11/19/14 13:53:31 (6 years ago)
Author:
Dan <dan@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
c66a465
Parents:
78911b5
Message:

Cleaned up parallel support.

Need to consider reducing code duplication and also adding support for DUCK reporting.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dag25.c

    rfe11d12 r9149564  
    124124        uint8_t *bottom; /* Pointer to the first unread byte in the DAG memory */
    125125        uint32_t processed; /* Amount of data processed from the bottom pointer */
     126        uint64_t pkt_count; /* Number of packets seen by the thread */
    126127};
    127128
     
    157158        uint64_t drops;
    158159        /* When running in parallel mode this is malloc'd with an array of thread
    159          * structures */
     160         * structures. Most of the stuff above doesn't get used in parallel mode. */
    160161        struct dag_per_thread_t *per_thread;
    161162};
     
    13861387                        if (libtrace_halt)
    13871388                                return 0;
     1389
     1390                        /* Check message queue to see if we should abort early */
     1391                        if (libtrace_message_queue_count(&t->messages) > 0)
     1392                                return -2;
     1393
    13881394                        /* Keep trying until we see a packet */
    13891395                        continue;
     
    13981404                return -1;
    13991405
     1406        PERPKT_DATA(t)->pkt_count++;
     1407
    14001408        return packet->payload ? htons(erfptr->rlen) :
    14011409                erf_get_framing_length(packet);
     
    14031411
    14041412static int dag_ppause_input(libtrace_t *libtrace) {
    1405         fprintf(stderr, "Assuming threads will pause when unregistered!\n");
     1413        int i, tot = libtrace->perpkt_thread_count;
     1414        struct dag_per_thread_t *t_data;
     1415
     1416        /* Stop and detach all the streams */
     1417        printf("Stopping and detaching all streams\n");
     1418        for (i = 0; i < tot; i++) {
     1419                t_data = &FORMAT_DATA->per_thread[i];
     1420
     1421                if (dag_stop_stream(t_data->device->fd,
     1422                                                        t_data->stream) < 0) {
     1423                        trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
     1424                                                  t_data->stream);
     1425                        return -1;
     1426                }
     1427
     1428                if (dag_detach_stream(t_data->device->fd,
     1429                                                          t_data->stream) < 0) {
     1430                        trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
     1431                                                  t_data->stream);
     1432                        return -1;
     1433                }
     1434        }
     1435
     1436        /* Free up the per_thread array */
     1437        free(FORMAT_DATA->per_thread);
     1438        FORMAT_DATA->per_thread = NULL;
     1439
    14061440        return 0;
    1407 }
    1408 
    1409 static int dag_pfin_input(libtrace_t *libtrace) {
    1410         assert(0 && "dag_pfin_input not implemented");
    1411 
    1412         return -1;
    14131441}
    14141442
    14151443static int dag_pconfig_input(libtrace_t *libtrace,
    14161444                                                         trace_parallel_option_t option, void *value) {
    1417         assert(0 && "dag_pconfig_input not implemented");
     1445
     1446        /* We don't support any of these! Normally you configure the DAG card
     1447         * externally. */
     1448        switch(option) {
     1449        case TRACE_OPTION_SET_HASHER:
     1450        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
     1451        case TRACE_OPTION_TRACETIME:
     1452        case TRACE_OPTION_TICK_INTERVAL:
     1453        case TRACE_OPTION_GET_CONFIG:
     1454        case TRACE_OPTION_SET_CONFIG:
     1455                return -1;
     1456        }
     1457        /* We don't provide a default option to ensure that future options will
     1458         * generate a compiler warning. */
    14181459
    14191460        return -1;
     
    14241465        /* XXX: This function gets run sequentially for all
    14251466         * threads. Should investigate making it parallel as draining the
    1426          * memory could be needlessly time consuming
     1467         * memory could be needlessly time consuming.
    14271468         */
    14281469        uint8_t *top, *bottom;
    14291470        uint8_t diff = 0; /* XXX: Investigate this type, as I would assume the value
    14301471                                           * could be larger than 255 */
     1472        struct timeval zero, nopoll;
    14311473
    14321474        top = bottom = NULL;
    14331475
    1434         fprintf(stderr, "t%u: registering thread\n", t->perpkt_num);
     1476        /* Minimum delay is 10mS */
     1477        zero.tv_sec = 0;
     1478        zero.tv_usec = 10000;
     1479        nopoll = zero;
     1480
    14351481        if (reader) {
    14361482                if (t->type == THREAD_PERPKT) {
     
    14541500                        }
    14551501
     1502                        /* Ensure that dag_advance_stream will return without blocking */
     1503                        if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
     1504                                                                   PERPKT_DATA(t)->stream, 0, &zero,
     1505                                                                   &nopoll) < 0) {
     1506                                trace_set_err(libtrace, errno, "dag_set_stream_poll failed!");
     1507                                return -1;
     1508                        }
     1509
    14561510                        /* Clear all the data from the memory hole */
    14571511                        do {
     
    14591513                                                                                 PERPKT_DATA(t)->stream,
    14601514                                                                                 &bottom);
     1515
    14611516                                assert(top && bottom);
    14621517                                diff = top - bottom;
     
    14641519                        } while (diff != 0);
    14651520
    1466                         /* Clear the pointers */
    14671521                        PERPKT_DATA(t)->top = NULL;
    14681522                        PERPKT_DATA(t)->bottom = NULL;
     1523                        PERPKT_DATA(t)->pkt_count = 0;
    14691524                } else {
    14701525                        /* TODO: Figure out why we need this */
     
    14731528        }
    14741529
    1475         return 0;
    1476 }
    1477 
    1478 static void dag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) {
    1479         if (t->type == THREAD_PERPKT) {
    1480                 fprintf(stderr, "t%u: stopping and detaching stream %u\n",
    1481                                 t->perpkt_num, PERPKT_DATA(t)->stream);
    1482 
    1483                 /* Stop and detach the DAG stream */
    1484                 if (dag_stop_stream(PERPKT_DATA(t)->device->fd,
    1485                                                         PERPKT_DATA(t)->stream) < 0) {
    1486                         trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
    1487                                                   PERPKT_DATA(t)->stream);
    1488                         return -1;
    1489                 }
    1490 
    1491                 if (dag_detach_stream(PERPKT_DATA(t)->device->fd,
    1492                                                           PERPKT_DATA(t)->stream) < 0) {
    1493                         trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
    1494                                                   PERPKT_DATA(t)->stream);
    1495                         return -1;
    1496                 }
    1497         }
     1530        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
    14981531
    14991532        return 0;
     
    15451578                dag_pread_packet,
    15461579                dag_ppause_input,
    1547                 dag_pfin_input,
     1580                NULL,
    15481581                dag_pconfig_input,
    15491582                dag_pregister_thread,
    1550                 dag_punregister_thread
     1583                NULL
    15511584};
    15521585
Note: See TracChangeset for help on using the changeset viewer.