Changes in / [21f5f0f:59e8400]


Ignore:
Location:
lib
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/data-struct/object_cache.c

    r78911b5 r957a72a  
    226226        // Make sure we haven't lost too many packets
    227227        if (oc->current_allocations)
    228                 fprintf(stderr, "!!OCache closing lost, %d packets!!\n", (int) oc->current_allocations);
     228                fprintf(stderr, "!!OCache closing lost, %d packets!!", (int) oc->current_allocations);
    229229        else
    230                 /* This is clearly a bug, but I don't know what to replace it with... */
    231                 fprintf(stderr, "!!OCache closing lost, %d packets!!\n", (int) oc->current_allocations);
     230                fprintf(stderr, "!!OCache closing lost, %d packets!!", (int) oc->current_allocations);
    232231        libtrace_ringbuffer_destroy(&oc->rb);
    233232        pthread_spin_destroy(&oc->spin);
  • lib/format_dag25.c

    rc66a465 r11cf9b1  
    8282#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8383#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
    84 #define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
    8584
    8685#define FORMAT_DATA DATA(libtrace)
     
    115114        /* A buffer to hold the data to be transmittted */
    116115        uint8_t *txbuffer;
    117 };
    118 
    119 /* Data that is stored for each libtrace_thread_t */
    120 struct dag_per_thread_t {
    121         struct dag_dev_t *device; /* DAG device */
    122         uint16_t stream; /* Stream number */
    123         uint8_t *top; /* Pointer to the last unread byte in the DAG memory */
    124         uint8_t *bottom; /* Pointer to the first unread byte in the DAG memory */
    125         uint32_t processed; /* Amount of data processed from the bottom pointer */
    126         uint64_t pkt_count; /* Number of packets seen by the thread */
    127         uint64_t drops;
    128116};
    129117
     
    158146        /* The number of packets that have been dropped */
    159147        uint64_t drops;
    160         /* When running in parallel mode this is malloc'd with an array of thread
    161          * structures. Most of the stuff above doesn't get used in parallel mode. */
    162         struct dag_per_thread_t *per_thread;
    163148};
    164149
     
    465450        char *dag_dev_name = NULL;
    466451        char *scan = NULL;
    467         int stream = 0, thread_count = 1;
     452        int stream = 0;
    468453        struct dag_dev_t *dag_device = NULL;
    469454
     
    472457         * list */
    473458        pthread_mutex_lock(&open_dag_mutex);
    474 
    475 
    476         /* DAG cards support multiple streams. In a single threaded capture,
    477          * these are specified using a comma in the libtrace URI,
     459       
     460       
     461        /* Specific streams are signified using a comma in the libtrace URI,
    478462         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
    479463         *
    480          * If no stream is specified, we will read from stream 0 with one thread
    481          */
    482         if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     464         * If no stream is specified, we will read from stream 0 */
     465        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    483466                dag_dev_name = strdup(libtrace->uridata);
    484                
    485467        } else {
    486468                dag_dev_name = (char *)strndup(libtrace->uridata,
    487                                                                            (size_t)(scan - libtrace->uridata));
     469                                (size_t)(scan - libtrace->uridata));
    488470                stream = atoi(++scan);
    489471        }
     
    818800
    819801        dag_record_t *erfptr;
    820         libtrace_thread_t *t;
    821802       
    822803        /* If the packet previously owned a buffer that is not the buffer
     
    855836
    856837        /* Update the dropped packets counter */
     838
    857839        /* No loss counter for DSM coloured records - have to use
    858840         * some other API */
    859         /* Adding multithread support for this isn't actually that useful for the
    860          * DAG7.5G2, as there's no way to use multiple receive streams without DSM */
    861841        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
    862842                /* TODO */
    863843        } else {
    864844                /* Use the ERF loss counter */
    865                 if (DATA(libtrace)->per_thread) {
    866                         t = get_thread_table(libtrace);
    867                         PERPKT_DATA(t)->drops += ntohs(erfptr->lctr);
    868                 } else {
    869                         printf("DROP!\n");
    870                         DATA(libtrace)->drops += ntohs(erfptr->lctr);
    871                 }
    872         }
    873 
     845                DATA(libtrace)->drops += ntohs(erfptr->lctr);
     846        }
    874847
    875848        return 0;
     
    12261199/* Gets the number of dropped packets */
    12271200static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
    1228         uint64_t sum = 0;
    1229         int i, tot;
    1230 
    12311201        if (trace->format_data == NULL)
    12321202                return (uint64_t)-1;
    1233 
    1234         if (DATA(trace)->per_thread) {
    1235                 tot = trace->perpkt_thread_count;
    1236 
    1237                 for (i = 0; i < tot; i++) {
    1238                         printf("t%d: drops %" PRIu64 "\n",
    1239                                    DATA(trace)->per_thread[i].drops);
    1240                         sum += DATA(trace)->per_thread[i].drops;
    1241                 }
    1242         }
    1243 
    1244         sum += DATA(trace)->drops;
    1245 
    1246         return sum;
     1203        return DATA(trace)->drops;
    12471204}
    12481205
     
    12581215        printf("\tnone\n");
    12591216        printf("\n");
    1260 }
    1261 
    1262 static int dag_pstart_input(libtrace_t *libtrace) {
    1263         char *scan, *tok;
    1264         uint16_t stream_count = 0, max_streams;
    1265         /* We keep our own pointer to per_thread as the system will free
    1266          * up FORMAT_DATA without freeing this if something goes wrong */
    1267         struct dag_per_thread_t *per_thread = NULL;
    1268         int iserror = 0;
    1269 
    1270         /* Check we aren't trying to create more threads than the DAG card can
    1271          * handle */
    1272         max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
    1273         if (libtrace->perpkt_thread_count > max_streams) {
    1274                 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "trying to create too "
    1275                                           "many threads (max is %u)", max_streams);
    1276                 iserror = 1;
    1277                 goto cleanup;
    1278         }
    1279 
    1280         /* Create the thread structures */
    1281         per_thread = calloc(libtrace->perpkt_thread_count,
    1282                                                                          sizeof(struct dag_per_thread_t));
    1283         FORMAT_DATA->per_thread = per_thread;
    1284 
    1285         /* Get the stream names from the uri */
    1286         if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
    1287                 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri doesn't "
    1288                                           "specify the DAG streams");
    1289                 iserror = 1;
    1290                 goto cleanup;
    1291         }
    1292 
    1293         scan++;
    1294  
    1295         tok = strtok(scan, ",");
    1296         while (tok != NULL) {
    1297                 /* Ensure we haven't specified too many streams */
    1298                 if (stream_count >= libtrace->perpkt_thread_count) {
    1299                         trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri specifies too "
    1300                                           "many streams. Max is %u", max_streams);
    1301                         iserror = 1;
    1302                         goto cleanup;
    1303                 }
    1304 
    1305                 /* Save the stream details */
    1306                 per_thread[stream_count].device = FORMAT_DATA->device;
    1307                 per_thread[stream_count++].stream = (uint16_t)atoi(tok);
    1308 
    1309                 tok = strtok(NULL, ",");
    1310         }
    1311 
    1312  cleanup:
    1313         if (iserror) {
    1314                 /* Free the per_thread memory */
    1315                 free(per_thread);
    1316                
    1317                 return -1;
    1318         } else {
    1319                 return 0;
    1320         }
    1321 }
    1322 
    1323 
    1324 
    1325 /* TODO: Fold this into dag_available */
    1326 static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t) {
    1327         uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
    1328 
    1329         /* If we've processed more than 4MB of data since we last called
    1330          * dag_advance_stream, then we should call it again to allow the
    1331          * space occupied by that 4MB to be released */
    1332         if (diff >= dag_record_size && PERPKT_DATA(t)->processed < 4 * 1024 * 1024)
    1333                 return diff;
    1334 
    1335         /* Update top and bottom pointers */
    1336         PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
    1337                                                                                          PERPKT_DATA(t)->stream,
    1338                                                                                          &(PERPKT_DATA(t)->bottom));
    1339 
    1340         if (PERPKT_DATA(t)->top == NULL) {
    1341                 trace_set_err(libtrace, errno, "dag_advance_stream failed!");
    1342                 return -1;
    1343         }
    1344 
    1345         PERPKT_DATA(t)->processed = 0;
    1346         diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
    1347         return diff;
    1348 }
    1349 
    1350 /* TODO: Fold this into dag_get_record */
    1351 static dag_record_t *dag_pget_record(libtrace_t *libtrace,
    1352                                                                          libtrace_thread_t *t) {
    1353         dag_record_t *erfptr = NULL;
    1354         uint16_t size;
    1355 
    1356         erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
    1357         if (!erfptr)
    1358                 return NULL;
    1359 
    1360         /* Ensure we have a whole record */
    1361         size = ntohs(erfptr->rlen);
    1362         assert(size >= dag_record_size);
    1363         if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
    1364                 return NULL;
    1365 
    1366         /* Advance the buffer pointers */
    1367         PERPKT_DATA(t)->bottom += size;
    1368         PERPKT_DATA(t)->processed += size;
    1369        
    1370         return erfptr;
    1371 }
    1372 
    1373 static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
    1374                                                         libtrace_packet_t *packet) {
    1375         dag_record_t *erfptr = NULL;
    1376         int numbytes = 0;
    1377         uint32_t flags = 0;
    1378         struct timeval maxwait, pollwait;
    1379 
    1380         pollwait.tv_sec = 0;
    1381         pollwait.tv_usec = 10000;
    1382         maxwait.tv_sec = 0;
    1383         maxwait.tv_usec = 250000;
    1384 
    1385         /* TODO: Support DUCK reporting */
    1386 
    1387         /* Don't let anyone try to free our DAG memory hole! */
    1388         flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
    1389 
    1390         /* If the packet buffer is currently owned by libtrace, free it so
    1391          * that we can set the packet to point into the DAG memory hole */
    1392         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1393                 free(packet->buffer);
    1394                 packet->buffer = 0;
    1395         }
    1396 
    1397         /* Configure DAG card stream polling */
    1398         if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd, PERPKT_DATA(t)->stream,
    1399                                                         sizeof(dag_record_t), &maxwait, &pollwait) < 0) {
    1400                 trace_set_err(libtrace, errno, "dag_set_stream_poll");
    1401                 return -1;
    1402         }
    1403 
    1404         /* Grab an ERF record */
    1405         do {
    1406                 numbytes = dag_pavailable(libtrace, t);
    1407                 if (numbytes < 0)
    1408                         return numbytes;
    1409                 if (numbytes < dag_record_size) {
    1410                         if (libtrace_halt)
    1411                                 return 0;
    1412 
    1413                         /* Check message queue to see if we should abort early */
    1414                         if (libtrace_message_queue_count(&t->messages) > 0)
    1415                                 return -2;
    1416 
    1417                         /* Keep trying until we see a packet */
    1418                         continue;
    1419                 }
    1420 
    1421                 erfptr = dag_pget_record(libtrace, t);
    1422         } while (erfptr == NULL);
    1423 
    1424         /* Prepare the libtrace packet */
    1425         if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
    1426                                                    flags))
    1427                 return -1;
    1428 
    1429         PERPKT_DATA(t)->pkt_count++;
    1430 
    1431         return packet->payload ? htons(erfptr->rlen) :
    1432                 erf_get_framing_length(packet);
    1433 }
    1434 
    1435 static int dag_ppause_input(libtrace_t *libtrace) {
    1436         int i, tot = libtrace->perpkt_thread_count;
    1437         struct dag_per_thread_t *t_data;
    1438 
    1439         /* Stop and detach all the streams */
    1440         printf("Stopping and detaching all streams\n");
    1441         for (i = 0; i < tot; i++) {
    1442                 t_data = &FORMAT_DATA->per_thread[i];
    1443 
    1444                 if (dag_stop_stream(t_data->device->fd,
    1445                                                         t_data->stream) < 0) {
    1446                         trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
    1447                                                   t_data->stream);
    1448                         return -1;
    1449                 }
    1450 
    1451                 if (dag_detach_stream(t_data->device->fd,
    1452                                                           t_data->stream) < 0) {
    1453                         trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
    1454                                                   t_data->stream);
    1455                         return -1;
    1456                 }
    1457         }
    1458 
    1459         /* Free up the per_thread array */
    1460         free(FORMAT_DATA->per_thread);
    1461         FORMAT_DATA->per_thread = NULL;
    1462 
    1463         return 0;
    1464 }
    1465 
    1466 static int dag_pconfig_input(libtrace_t *libtrace,
    1467                                                          trace_parallel_option_t option, void *value) {
    1468 
    1469         /* We don't support any of these! Normally you configure the DAG card
    1470          * externally. */
    1471         switch(option) {
    1472         case TRACE_OPTION_SET_HASHER:
    1473         case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
    1474         case TRACE_OPTION_TRACETIME:
    1475         case TRACE_OPTION_TICK_INTERVAL:
    1476         case TRACE_OPTION_GET_CONFIG:
    1477         case TRACE_OPTION_SET_CONFIG:
    1478                 return -1;
    1479         }
    1480         /* We don't provide a default option to ensure that future options will
    1481          * generate a compiler warning. */
    1482 
    1483         return -1;
    1484 }
    1485 
    1486 static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
    1487                                                                 bool reader) {
    1488         /* XXX: This function gets run sequentially for all
    1489          * threads. Should investigate making it parallel as draining the
    1490          * memory could be needlessly time consuming.
    1491          */
    1492         uint8_t *top, *bottom;
    1493         uint8_t diff = 0; /* XXX: Investigate this type, as I would assume the value
    1494                                            * could be larger than 255 */
    1495         struct timeval zero, nopoll;
    1496 
    1497         top = bottom = NULL;
    1498 
    1499         /* Minimum delay is 10mS */
    1500         zero.tv_sec = 0;
    1501         zero.tv_usec = 10000;
    1502         nopoll = zero;
    1503 
    1504         if (reader) {
    1505                 if (t->type == THREAD_PERPKT) {
    1506                         /* Pass the per thread data to the thread */
    1507                         t->format_data = &FORMAT_DATA->per_thread[t->perpkt_num];
    1508 
    1509                         /* Attach and start the DAG stream */
    1510                         printf("t%u: starting and attaching stream #%u\n", t->perpkt_num,
    1511                                    PERPKT_DATA(t)->stream);
    1512                         if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
    1513                                                                   PERPKT_DATA(t)->stream, 0, 0) < 0) {
    1514                                 trace_set_err(libtrace, errno, "can't attach DAG stream #%u",
    1515                                                           PERPKT_DATA(t)->stream);
    1516                                 return -1;
    1517                         }
    1518                         if (dag_start_stream(PERPKT_DATA(t)->device->fd,
    1519                                                                  PERPKT_DATA(t)->stream) < 0) {
    1520                                 trace_set_err(libtrace, errno, "can't start DAG stream #%u",
    1521                                                           PERPKT_DATA(t)->stream);
    1522                                 return -1;
    1523                         }
    1524 
    1525                         /* Ensure that dag_advance_stream will return without blocking */
    1526                         if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
    1527                                                                    PERPKT_DATA(t)->stream, 0, &zero,
    1528                                                                    &nopoll) < 0) {
    1529                                 trace_set_err(libtrace, errno, "dag_set_stream_poll failed!");
    1530                                 return -1;
    1531                         }
    1532 
    1533                         /* Clear all the data from the memory hole */
    1534                         do {
    1535                                 top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
    1536                                                                                  PERPKT_DATA(t)->stream,
    1537                                                                                  &bottom);
    1538 
    1539                                 assert(top && bottom);
    1540                                 diff = top - bottom;
    1541                                 bottom -= diff;
    1542                         } while (diff != 0);
    1543 
    1544                         PERPKT_DATA(t)->top = NULL;
    1545                         PERPKT_DATA(t)->bottom = NULL;
    1546                         PERPKT_DATA(t)->pkt_count = 0;
    1547                         PERPKT_DATA(t)->drops = 0;
    1548                 } else {
    1549                         /* TODO: Figure out why we need this */
    1550                         t->format_data = &FORMAT_DATA->per_thread[0];
    1551                 }
    1552         }
    1553 
    1554         fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
    1555 
    1556         return 0;
    15571217}
    15581218
     
    15981258        dag_help,                       /* help */
    15991259        NULL,                            /* next pointer */
    1600                 {true, 0}, /* live packet capture, thread limit TBD */
    1601                 dag_pstart_input,
    1602                 dag_pread_packet,
    1603                 dag_ppause_input,
    1604                 NULL,
    1605                 dag_pconfig_input,
    1606                 dag_pregister_thread,
    1607                 NULL
     1260        NON_PARALLEL(true)
    16081261};
    16091262
  • lib/trace_parallel.c

    r78911b5 rf9ba82f  
    124124
    125125static void print_memory_stats() {
    126 #if 0
    127126        char t_name[50];
    128127        uint64_t total;
     
    166165                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
    167166        }
    168 #endif
     167
    169168}
    170169
     
    13381337        assert(packet);
    13391338
    1340         if (libtrace->format->pread_packet) {
     1339        if (libtrace->format->read_packet) {
    13411340                do {
    13421341                        size_t ret;
     
    21992198        assert(str);
    22002199        assert(uc);
     2200        printf ("Splitting string \"%s\" into tokens:\n",str);
    22012201        pch = strtok (str," ,.-");
    22022202        while (pch != NULL)
Note: See TracChangeset for help on using the changeset viewer.