Changeset fe11d12 for lib


Ignore:
Timestamp:
11/19/14 09:21:47 (6 years ago)
Author:
Dan Collins <djc44@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
78911b5
Parents:
2badac9
Message:

Started work on creating a parallel interface for DAG.

There are still a number of things to do including:

  • Support for DUCK reporting
  • Couting the number of dropped packets
  • Reducing code duplication
  • Implementing pconfig_input, pfin_input and possibly fixing ppause_input
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dag25.c

    r11cf9b1 rfe11d12  
    8282#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8383#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
     84#define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
    8485
    8586#define FORMAT_DATA DATA(libtrace)
     
    114115        /* A buffer to hold the data to be transmittted */
    115116        uint8_t *txbuffer;
     117};
     118
     119/* Data that is stored for each libtrace_thread_t */
     120struct dag_per_thread_t {
     121        struct dag_dev_t *device; /* DAG device */
     122        uint16_t stream; /* Stream number */
     123        uint8_t *top; /* Pointer to the last unread byte in the DAG memory */
     124        uint8_t *bottom; /* Pointer to the first unread byte in the DAG memory */
     125        uint32_t processed; /* Amount of data processed from the bottom pointer */
    116126};
    117127
     
    146156        /* The number of packets that have been dropped */
    147157        uint64_t drops;
     158        /* When running in parallel mode this is malloc'd with an array of thread
     159         * structures */
     160        struct dag_per_thread_t *per_thread;
    148161};
    149162
     
    450463        char *dag_dev_name = NULL;
    451464        char *scan = NULL;
    452         int stream = 0;
     465        int stream = 0, thread_count = 1;
    453466        struct dag_dev_t *dag_device = NULL;
    454467
     
    457470         * list */
    458471        pthread_mutex_lock(&open_dag_mutex);
    459        
    460        
    461         /* Specific streams are signified using a comma in the libtrace URI,
     472
     473
     474        /* DAG cards support multiple streams. In a single threaded capture,
     475         * these are specified using a comma in the libtrace URI,
    462476         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
    463477         *
    464          * If no stream is specified, we will read from stream 0 */
    465         if ((scan = strchr(libtrace->uridata,',')) == NULL) {
     478         * If no stream is specified, we will read from stream 0 with one thread
     479         */
     480        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
    466481                dag_dev_name = strdup(libtrace->uridata);
     482               
    467483        } else {
    468484                dag_dev_name = (char *)strndup(libtrace->uridata,
    469                                 (size_t)(scan - libtrace->uridata));
     485                                                                           (size_t)(scan - libtrace->uridata));
    470486                stream = atoi(++scan);
    471487        }
     
    835851        }
    836852
     853        /* TODO: This isn't thread safe */
     854#if 0
    837855        /* Update the dropped packets counter */
    838856
     
    845863                DATA(libtrace)->drops += ntohs(erfptr->lctr);
    846864        }
     865#endif
    847866
    848867        return 0;
     
    12151234        printf("\tnone\n");
    12161235        printf("\n");
     1236}
     1237
     1238static int dag_pstart_input(libtrace_t *libtrace) {
     1239        char *scan, *tok;
     1240        uint16_t stream_count = 0, max_streams;
     1241        /* We keep our own pointer to per_thread as the system will free
     1242         * up FORMAT_DATA without freeing this if something goes wrong */
     1243        struct dag_per_thread_t *per_thread = NULL;
     1244        int iserror = 0;
     1245
     1246        /* Check we aren't trying to create more threads than the DAG card can
     1247         * handle */
     1248        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
     1249        if (libtrace->perpkt_thread_count > max_streams) {
     1250                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "trying to create too "
     1251                                          "many threads (max is %u)", max_streams);
     1252                iserror = 1;
     1253                goto cleanup;
     1254        }
     1255
     1256        /* Create the thread structures */
     1257        per_thread = calloc(libtrace->perpkt_thread_count,
     1258                                                                         sizeof(struct dag_per_thread_t));
     1259        FORMAT_DATA->per_thread = per_thread;
     1260
     1261        /* Get the stream names from the uri */
     1262        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     1263                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri doesn't "
     1264                                          "specify the DAG streams");
     1265                iserror = 1;
     1266                goto cleanup;
     1267        }
     1268
     1269        scan++;
     1270 
     1271        tok = strtok(scan, ",");
     1272        while (tok != NULL) {
     1273                /* Ensure we haven't specified too many streams */
     1274                if (stream_count >= libtrace->perpkt_thread_count) {
     1275                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri specifies too "
     1276                                          "many streams. Max is %u", max_streams);
     1277                        iserror = 1;
     1278                        goto cleanup;
     1279                }
     1280
     1281                /* Save the stream details */
     1282                per_thread[stream_count].device = FORMAT_DATA->device;
     1283                per_thread[stream_count++].stream = (uint16_t)atoi(tok);
     1284
     1285                tok = strtok(NULL, ",");
     1286        }
     1287
     1288 cleanup:
     1289        if (iserror) {
     1290                /* Free the per_thread memory */
     1291                free(per_thread);
     1292               
     1293                return -1;
     1294        } else {
     1295                return 0;
     1296        }
     1297}
     1298
     1299
     1300
     1301/* TODO: Fold this into dag_available */
     1302static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t) {
     1303        uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
     1304
     1305        /* If we've processed more than 4MB of data since we last called
     1306         * dag_advance_stream, then we should call it again to allow the
     1307         * space occupied by that 4MB to be released */
     1308        if (diff >= dag_record_size && PERPKT_DATA(t)->processed < 4 * 1024 * 1024)
     1309                return diff;
     1310
     1311        /* Update top and bottom pointers */
     1312        PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
     1313                                                                                         PERPKT_DATA(t)->stream,
     1314                                                                                         &(PERPKT_DATA(t)->bottom));
     1315
     1316        if (PERPKT_DATA(t)->top == NULL) {
     1317                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
     1318                return -1;
     1319        }
     1320
     1321        PERPKT_DATA(t)->processed = 0;
     1322        diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
     1323        return diff;
     1324}
     1325
     1326/* TODO: Fold this into dag_get_record */
     1327static dag_record_t *dag_pget_record(libtrace_t *libtrace,
     1328                                                                         libtrace_thread_t *t) {
     1329        dag_record_t *erfptr = NULL;
     1330        uint16_t size;
     1331
     1332        erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
     1333        if (!erfptr)
     1334                return NULL;
     1335
     1336        /* Ensure we have a whole record */
     1337        size = ntohs(erfptr->rlen);
     1338        assert(size >= dag_record_size);
     1339        if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
     1340                return NULL;
     1341
     1342        /* Advance the buffer pointers */
     1343        PERPKT_DATA(t)->bottom += size;
     1344        PERPKT_DATA(t)->processed += size;
     1345       
     1346        return erfptr;
     1347}
     1348
     1349static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
     1350                                                        libtrace_packet_t *packet) {
     1351        dag_record_t *erfptr = NULL;
     1352        int numbytes = 0;
     1353        uint32_t flags = 0;
     1354        struct timeval maxwait, pollwait;
     1355
     1356        pollwait.tv_sec = 0;
     1357        pollwait.tv_usec = 10000;
     1358        maxwait.tv_sec = 0;
     1359        maxwait.tv_usec = 250000;
     1360
     1361        /* TODO: Support DUCK reporting */
     1362
     1363        /* Don't let anyone try to free our DAG memory hole! */
     1364        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
     1365
     1366        /* If the packet buffer is currently owned by libtrace, free it so
     1367         * that we can set the packet to point into the DAG memory hole */
     1368        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1369                free(packet->buffer);
     1370                packet->buffer = 0;
     1371        }
     1372
     1373        /* Configure DAG card stream polling */
     1374        if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd, PERPKT_DATA(t)->stream,
     1375                                                        sizeof(dag_record_t), &maxwait, &pollwait) < 0) {
     1376                trace_set_err(libtrace, errno, "dag_set_stream_poll");
     1377                return -1;
     1378        }
     1379
     1380        /* Grab an ERF record */
     1381        do {
     1382                numbytes = dag_pavailable(libtrace, t);
     1383                if (numbytes < 0)
     1384                        return numbytes;
     1385                if (numbytes < dag_record_size) {
     1386                        if (libtrace_halt)
     1387                                return 0;
     1388                        /* Keep trying until we see a packet */
     1389                        continue;
     1390                }
     1391
     1392                erfptr = dag_pget_record(libtrace, t);
     1393        } while (erfptr == NULL);
     1394
     1395        /* Prepare the libtrace packet */
     1396        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
     1397                                                   flags))
     1398                return -1;
     1399
     1400        return packet->payload ? htons(erfptr->rlen) :
     1401                erf_get_framing_length(packet);
     1402}
     1403
     1404static int dag_ppause_input(libtrace_t *libtrace) {
     1405        fprintf(stderr, "Assuming threads will pause when unregistered!\n");
     1406        return 0;
     1407}
     1408
     1409static int dag_pfin_input(libtrace_t *libtrace) {
     1410        assert(0 && "dag_pfin_input not implemented");
     1411
     1412        return -1;
     1413}
     1414
     1415static int dag_pconfig_input(libtrace_t *libtrace,
     1416                                                         trace_parallel_option_t option, void *value) {
     1417        assert(0 && "dag_pconfig_input not implemented");
     1418
     1419        return -1;
     1420}
     1421
     1422static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
     1423                                                                bool reader) {
     1424        /* XXX: This function gets run sequentially for all
     1425         * threads. Should investigate making it parallel as draining the
     1426         * memory could be needlessly time consuming
     1427         */
     1428        uint8_t *top, *bottom;
     1429        uint8_t diff = 0; /* XXX: Investigate this type, as I would assume the value
     1430                                           * could be larger than 255 */
     1431
     1432        top = bottom = NULL;
     1433
     1434        fprintf(stderr, "t%u: registering thread\n", t->perpkt_num);
     1435        if (reader) {
     1436                if (t->type == THREAD_PERPKT) {
     1437                        /* Pass the per thread data to the thread */
     1438                        t->format_data = &FORMAT_DATA->per_thread[t->perpkt_num];
     1439
     1440                        /* Attach and start the DAG stream */
     1441                        printf("t%u: starting and attaching stream #%u\n", t->perpkt_num,
     1442                                   PERPKT_DATA(t)->stream);
     1443                        if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
     1444                                                                  PERPKT_DATA(t)->stream, 0, 0) < 0) {
     1445                                trace_set_err(libtrace, errno, "can't attach DAG stream #%u",
     1446                                                          PERPKT_DATA(t)->stream);
     1447                                return -1;
     1448                        }
     1449                        if (dag_start_stream(PERPKT_DATA(t)->device->fd,
     1450                                                                 PERPKT_DATA(t)->stream) < 0) {
     1451                                trace_set_err(libtrace, errno, "can't start DAG stream #%u",
     1452                                                          PERPKT_DATA(t)->stream);
     1453                                return -1;
     1454                        }
     1455
     1456                        /* Clear all the data from the memory hole */
     1457                        do {
     1458                                top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
     1459                                                                                 PERPKT_DATA(t)->stream,
     1460                                                                                 &bottom);
     1461                                assert(top && bottom);
     1462                                diff = top - bottom;
     1463                                bottom -= diff;
     1464                        } while (diff != 0);
     1465
     1466                        /* Clear the pointers */
     1467                        PERPKT_DATA(t)->top = NULL;
     1468                        PERPKT_DATA(t)->bottom = NULL;
     1469                } else {
     1470                        /* TODO: Figure out why we need this */
     1471                        t->format_data = &FORMAT_DATA->per_thread[0];
     1472                }
     1473        }
     1474
     1475        return 0;
     1476}
     1477
     1478static void dag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) {
     1479        if (t->type == THREAD_PERPKT) {
     1480                fprintf(stderr, "t%u: stopping and detaching stream %u\n",
     1481                                t->perpkt_num, PERPKT_DATA(t)->stream);
     1482
     1483                /* Stop and detach the DAG stream */
     1484                if (dag_stop_stream(PERPKT_DATA(t)->device->fd,
     1485                                                        PERPKT_DATA(t)->stream) < 0) {
     1486                        trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
     1487                                                  PERPKT_DATA(t)->stream);
     1488                        return -1;
     1489                }
     1490
     1491                if (dag_detach_stream(PERPKT_DATA(t)->device->fd,
     1492                                                          PERPKT_DATA(t)->stream) < 0) {
     1493                        trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
     1494                                                  PERPKT_DATA(t)->stream);
     1495                        return -1;
     1496                }
     1497        }
     1498
     1499        return 0;
    12171500}
    12181501
     
    12581541        dag_help,                       /* help */
    12591542        NULL,                            /* next pointer */
    1260         NON_PARALLEL(true)
     1543                {true, 0}, /* live packet capture, thread limit TBD */
     1544                dag_pstart_input,
     1545                dag_pread_packet,
     1546                dag_ppause_input,
     1547                dag_pfin_input,
     1548                dag_pconfig_input,
     1549                dag_pregister_thread,
     1550                dag_punregister_thread
    12611551};
    12621552
Note: See TracChangeset for help on using the changeset viewer.