Changeset 1b59edf


Ignore:
Timestamp:
02/11/15 11:13:27 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
0b01fea
Parents:
fed9152 (diff), 12ae766 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'tidy_parallel_libtrace' into develop

Location:
lib
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dpdk.c

    r1960910 r12ae766  
    201201 */
    202202
    203 /* Print verbose messages to stdout */
    204 #define DEBUG 1
     203/* Print verbose messages to stderr */
     204#define DEBUG 0
    205205
    206206/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     
    351351                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
    352352                                / sizeof (format_data->blacklist[0])) {
    353                         printf("Warning: too many devices to blacklist consider"
     353                        fprintf(stderr, "Warning: too many devices to blacklist consider"
    354354                                        " increasing BLACK_LIST_SIZE");
    355355                        break;
     
    625625        if (my_cpu < 0) {
    626626                /* If we can assign to a core on the same numa node */
    627                 printf("Using pci card on numa_node%d\n", format_data->nic_numa_node);
     627                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
    628628                if(format_data->nic_numa_node >= 0) {
    629629                        int max_node_cpu = -1;
     
    735735    struct rte_eth_dev_info dev_info;
    736736    rte_eth_dev_info_get(0, &dev_info);
    737     printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
     737    fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
    738738                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
    739739
     
    12061206         */
    12071207#if DEBUG
    1208     printf("Creating mempool named %s\n", format_data->mempool_name);
     1208    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    12091209#endif
    12101210    format_data->pktmbuf_pool =
     
    12431243    }
    12441244#if DEBUG
    1245     printf("Doing dev configure\n");
     1245    fprintf(stderr, "Doing dev configure\n");
    12461246#endif
    12471247    /* Initialise the TX queue a minimum value if using this port for
     
    12591259    for (i=0; i < rx_queues; i++) {
    12601260#if DEBUG
    1261     printf("Doing queue configure\n");
     1261    fprintf(stderr, "Doing queue configure\n");
    12621262#endif
    12631263
     
    14191419 * gives it.
    14201420 *
    1421  * We then allow a mapper thread to be started on every real core as DPDK would
     1421 * We then allow a mapper thread to be started on every real core as DPDK would,
    14221422 * we also bind these to the corresponding CPU cores.
    14231423 *
     
    14371437    // in this case physical cores on the system will not exist so we don't bind
    14381438    // these to any particular physical core
     1439    pthread_mutex_lock(&libtrace->libtrace_lock);
    14391440    if (reading) {
    14401441#if HAVE_LIBNUMA
     
    14721473        // TODO proper libtrace style error here!!
    14731474        fprintf(stderr, "Too many threads for DPDK!!\n");
     1475        pthread_mutex_unlock(&libtrace->libtrace_lock);
    14741476        return -1;
    14751477    }
     
    14961498        if (i != 0) {
    14971499            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
     1500            pthread_mutex_unlock(&libtrace->libtrace_lock);
    14981501            return -1;
    14991502        }
     
    15081511        }
    15091512    }
     1513    pthread_mutex_unlock(&libtrace->libtrace_lock);
    15101514    return 0;
    15111515}
     
    15231527
    15241528    assert(rte_lcore_id() < RTE_MAX_LCORE);
    1525 
     1529    pthread_mutex_lock(&libtrace->libtrace_lock);
    15261530    // Skip if master!!
    15271531    if (rte_lcore_id() == rte_get_master_lcore()) {
    15281532        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1533        pthread_mutex_unlock(&libtrace->libtrace_lock);
    15291534        return;
    15301535    }
     
    15351540    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
    15361541    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
     1542    pthread_mutex_unlock(&libtrace->libtrace_lock);
    15371543    return;
    15381544}
  • lib/libtrace_int.h

    r04bf7c5 r12ae766  
    193193 */
    194194struct libtrace_thread_t {
    195         int accepted_packets; // The number of packets accepted only used if pread
     195        uint64_t accepted_packets; // The number of packets accepted only used if pread
     196        uint64_t filtered_packets;
    196197        // is retreving packets
    197198        // Set to true once the first packet has been stored
     
    307308        fn_hasher hasher; // If valid using a separate thread
    308309        void *hasher_data;
    309        
     310        /** The pread_packet choosen path for the configuration */
     311        int (*pread)(libtrace_t *, libtrace_thread_t *, libtrace_packet_t **, size_t);
     312
    310313        libtrace_thread_t hasher_thread;
    311314        libtrace_thread_t reporter_thread;
     
    929932        /**
    930933         * Register a thread for use with the format or using the packets produced
    931          * by it. This is NOT only used for threads reading packets infact all
     934         * by it. This is NOT only used for threads reading packets in fact all
    932935         * threads use this.
     936         *
     937         * The libtrace lock is not held by this format but can be aquired
     938         * by the format.
    933939         *
    934940         * Some use cases include setting up any thread local storage required for
  • lib/trace.c

    r04bf7c5 r858ce90  
    282282        libtrace->dropped_packets = UINT64_MAX;
    283283        libtrace->received_packets = UINT64_MAX;
     284        libtrace->pread = NULL;
    284285        ZERO_USER_CONFIG(libtrace->config);
    285286
     
    396397        libtrace->perpkt_threads = NULL;
    397398        libtrace->tracetime = 0;
     399        libtrace->pread = NULL;
    398400        ZERO_USER_CONFIG(libtrace->config);
    399401       
     
    19571959{
    19581960        assert(trace);
     1961        int i = 0;
     1962        uint64_t ret = trace->filtered_packets;
     1963        for (i = 0; i < trace->perpkt_thread_count; i++) {
     1964                ret += trace->perpkt_threads[i].filtered_packets;
     1965        }
    19591966        if (trace->format->get_filtered_packets) {
    19601967                return trace->format->get_filtered_packets(trace)+
    1961                         trace->filtered_packets;
    1962         }
    1963         return trace->filtered_packets;
     1968                        ret;
     1969        }
     1970        return ret;
    19641971}
    19651972
  • lib/trace_parallel.c

    r04bf7c5 r12ae766  
    101101#include <unistd.h>
    102102
    103 
    104 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets);
    105 
     103static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
    106104extern int libtrace_parallel;
    107105
     
    226224
    227225        if (trace->config.debug_state)
    228                 fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
    229                         prev_state, t->state);
     226                fprintf(stderr, "Thread %d state changed from %d to %d\n",
     227                        (int) t->tid, prev_state, t->state);
    230228
    231229        pthread_cond_broadcast(&trace->perpkt_cond);
     
    293291void libtrace_zero_thread(libtrace_thread_t * t) {
    294292        t->accepted_packets = 0;
     293        t->filtered_packets = 0;
    295294        t->recorded_first = false;
    296295        t->tracetime_offset_usec = 0;
     
    387386}
    388387
    389 
    390 
    391 /**
    392  * Dispatches packets to their correct place and applies any translations
    393  * as needed.
    394  *
     388/**
     389 * Sends a packet to the user, expects either a valid packet or a TICK packet.
     390 *
     391 * Note READ_MESSAGE will only be returned if tracetime is true.
     392 *
     393 * @brief dispatch_packet
    395394 * @param trace
    396395 * @param t
    397  * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse
    398  * @return -1 if an error or EOF has occured and the trace should end, otherwise a postive number (or 0)
    399  * representing the number of packets returned, these will be at the beginning of the array.
    400  */
    401 static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets,
    402                                    size_t nb_packets) {
    403         libtrace_message_t message;
    404         size_t i, empty = 0;
    405         for (i = 0; i < nb_packets; ++i) {
    406                 if (packets[i]->error > 0) {
    407                         packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
    408                         trace_fin_packet(packets[i]);
    409                 } else if (packets[i]->error == READ_TICK) {
    410                         message.code = MESSAGE_TICK;
    411                         message.additional.uint64 = trace_packet_get_order(packets[i]);
    412                         message.sender = t;
    413                         (*trace->per_pkt)(trace, NULL, &message, t);
    414                 } else if (packets[i]->error != READ_MESSAGE) {
    415                         // An error this should be the last packet we read
    416                         size_t z;
    417                         // We could have an eof or error and a message such as pause
    418                         for (z = i + 1 ; z < nb_packets; ++z) {
    419                                 fprintf(stderr, "i=%d nb_packets=%d err=%d, seq=%d\n", (int) z, (int) nb_packets, packets[z]->error, (int) packets[z]->order);
    420                                 assert (packets[z]->error <= 0);
    421                         }
    422                         return -1;
    423                 }
    424                 if (packets[i]) {
    425                         // Move full slots to front
    426                         if (empty != i) {
    427                                 packets[empty] = packets[i];
    428                                 packets[i] = NULL;
    429                         }
    430                         ++empty;
    431                         // Finish packets while still in CPU cache
    432                 }
    433         }
    434         return empty;
    435 }
    436 
    437 static inline int dispatch_packet(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packet) {
    438         libtrace_message_t message;
     396 * @param packet A pointer to the packet storage, which may be set to null upon
     397 *               return, or a packet to be finished.
     398 * @return 0 is successful, otherwise if playing back in tracetime
     399 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
     400 */
     401static inline int dispatch_packet(libtrace_t *trace,
     402                                                 libtrace_thread_t *t,
     403                                                 libtrace_packet_t **packet,
     404                                                 bool tracetime) {
    439405        if ((*packet)->error > 0) {
     406                if (tracetime) {
     407                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
     408                                return READ_MESSAGE;
     409                }
    440410                *packet = (*trace->per_pkt)(trace, *packet, NULL, t);
    441411                trace_fin_packet(*packet);
    442         } else if ((*packet)->error == READ_TICK) {
     412        } else {
     413                libtrace_message_t message;
     414                assert((*packet)->error == READ_TICK);
    443415                message.code = MESSAGE_TICK;
    444416                message.additional.uint64 = trace_packet_get_order(*packet);
    445417                message.sender = t;
    446418                (*trace->per_pkt)(trace, NULL, &message, t);
    447         } else if ((*packet)->error != READ_MESSAGE) {
    448                 return -1;
    449419        }
    450420        return 0;
     421}
     422
     423/**
     424 * Pauses a per packet thread, messages will not be processed when the thread
     425 * is paused.
     426 *
     427 * This process involves reading packets if a hasher thread is used. As such
     428 * this function can fail to pause due to errors when reading in which case
     429 * the thread should be stopped instead.
     430 *
     431 *
     432 * @brief trace_perpkt_thread_pause
     433 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
     434 */
     435static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
     436                                     libtrace_packet_t *packets[],
     437                                     int *nb_packets, int *empty, int *offset) {
     438        libtrace_message_t message = {0};
     439        libtrace_packet_t * packet = NULL;
     440
     441        /* Let the user thread know we are going to pause */
     442        message.code = MESSAGE_PAUSING;
     443        message.sender = t;
     444        (*trace->per_pkt)(trace, NULL, &message, t);
     445
     446        /* Send through any remaining packets (or messages) without delay */
     447
     448        /* First send those packets already read, as fast as possible
     449         * This should never fail or check for messages etc. */
     450        for (;*offset < *nb_packets; ++*offset) {
     451                ASSERT_RET(dispatch_packet(trace, t, &packets[*offset], false), == 0);
     452                /* Move full slots to front as we go */
     453                if (packets[*offset]) {
     454                        if (*empty != *offset) {
     455                                packets[*empty] = packets[*offset];
     456                                packets[*offset] = NULL;
     457                        }
     458                        ++*empty;
     459                }
     460        }
     461
     462        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
     463        /* If a hasher thread is running, empty input queues so we don't lose data */
     464        if (trace_has_dedicated_hasher(trace)) {
     465                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
     466                // The hasher has stopped by this point, so the queue shouldn't be filling
     467                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
     468                        int ret = trace->pread(trace, t, &packet, 1);
     469                        if (ret == 1) {
     470                                if (packet->error > 0) {
     471                                        store_first_packet(trace, packet, t);
     472                                }
     473                                ASSERT_RET(dispatch_packet(trace, t, &packet, 1), == 0);
     474                                if (packet == NULL)
     475                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
     476                        } else if (ret != READ_MESSAGE) {
     477                                /* Ignore messages we pick these up next loop */
     478                                assert (ret == READ_EOF || ret == READ_ERROR);
     479                                /* Verify no packets are remaining */
     480                                /* TODO refactor this sanity check out!! */
     481                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
     482                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
     483                                        // No packets after this should have any data in them
     484                                        assert(packet->error <= 0);
     485                                }
     486                                fprintf(stderr, "PREAD_FAILED %d\n", ret);
     487                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
     488                                return -1;
     489                        }
     490                }
     491        }
     492        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
     493
     494        /* Now we do the actual pause, this returns when we resumed */
     495        trace_thread_pause(trace, t);
     496        message.code = MESSAGE_RESUMING;
     497        (*trace->per_pkt)(trace, NULL, &message, t);
     498        return 1;
    451499}
    452500
     
    456504static void* perpkt_threads_entry(void *data) {
    457505        libtrace_t *trace = (libtrace_t *)data;
    458         libtrace_thread_t * t;
     506        libtrace_thread_t *t;
    459507        libtrace_message_t message = {0};
    460508        libtrace_packet_t *packets[trace->config.burst_size];
    461         size_t nb_packets;
    462509        size_t i;
    463         int ret;
     510        //int ret;
     511        /* The current reading position into the packets */
     512        int offset = 0;
     513        /* The number of packets last read */
     514        int nb_packets = 0;
     515        /* The offset to the first NULL packet upto offset */
     516        int empty = 0;
    464517
    465518        /* Wait until trace_pstart has been completed */
     
    473526        }
    474527        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
     528        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     529
    475530        if (trace->format->pregister_thread) {
    476531                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
    477532        }
    478         ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    479533
    480534        /* Fill our buffer with empty packets */
     
    498552
    499553                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
     554                        int ret;
    500555                        switch (message.code) {
    501556                                case MESSAGE_DO_PAUSE: // This is internal
    502                                         // Send message to say we are pausing, TODO consider sender
    503                                         message.code = MESSAGE_PAUSING;
    504                                         message.sender = t;
    505                                         (*trace->per_pkt)(trace, NULL, &message, t);
    506                                         // If a hasher thread is running empty input queues so we don't lose data
    507                                         if (trace_has_dedicated_hasher(trace)) {
    508                                                 fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
    509                                                 // The hasher has stopped by this point, so the queue shouldn't be filling
    510                                                 while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    511                                                         ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
    512                                                         if (dispatch_packets(trace, t, packets, 1) == -1) {
    513                                                                 // EOF or error, either way we'll stop
    514                                                                 while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    515                                                                         ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
    516                                                                         // No packets after this should have any data in them
    517                                                                         assert(packets[0]->error <= 0);
    518                                                                 }
    519                                                                 goto stop;
    520                                                         }
    521                                                 }
     557                                        ret = trace_perpkt_thread_pause(trace, t, packets, &nb_packets, &empty, &offset);
     558                                        if (ret == READ_EOF) {
     559                                                fprintf(stderr, "PAUSE stop eof!!\n");
     560                                                goto eof;
     561                                        } else if (ret == READ_ERROR) {
     562                                                fprintf(stderr, "PAUSE stop error!!\n");
     563                                                goto error;
    522564                                        }
    523                                         // Now we do the actual pause, this returns when we are done
    524                                         trace_thread_pause(trace, t);
    525                                         message.code = MESSAGE_RESUMING;
    526                                         (*trace->per_pkt)(trace, NULL, &message, t);
    527                                         // Check for new messages as soon as we return
     565                                        assert(ret == 1);
    528566                                        continue;
    529567                                case MESSAGE_DO_STOP: // This is internal
    530                                         goto stop;
     568                                        fprintf(stderr, "DO_STOP stop!!\n");
     569                                        goto eof;
    531570                        }
    532571                        (*trace->per_pkt)(trace, NULL, &message, t);
     572                        /* Continue and the empty messages out before packets */
    533573                        continue;
    534574                }
    535575
    536                 if (trace->perpkt_thread_count == 1) {
    537                         assert(packets[0]);
    538                         packets[0]->error = trace_read_packet(trace, packets[0]);
    539                         if (dispatch_packet(trace, t, &packets[0]) != 0)
    540                                 break;
    541                         if (!packets[0]) {
    542                                 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1);
     576
     577                /* Do we need to read a new set of packets MOST LIKELY we do */
     578                if (offset == nb_packets) {
     579                        /* Refill the packet buffer */
     580                        if (empty != nb_packets) {
     581                                // Refill the empty packets
     582                                libtrace_ocache_alloc(&trace->packet_freelist,
     583                                                      (void **) &packets[empty],
     584                                                      nb_packets - empty,
     585                                                      nb_packets - empty);
     586                        }
     587                        if (!trace->pread) {
     588                                assert(packets[0]);
     589                                nb_packets = trace_read_packet(trace, packets[0]);
     590                                packets[0]->error = nb_packets;
     591                                if (nb_packets > 0)
     592                                        nb_packets = 1;
     593                        } else {
     594                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
     595                        }
     596                        offset = 0;
     597                        empty = 0;
     598                }
     599
     600                /* Handle error/message cases */
     601                if (nb_packets > 0) {
     602                        /* Store the first packet */
     603                        if (packets[0]->error > 0) {
     604                                store_first_packet(trace, packets[0], t);
     605                        }
     606                        for (;offset < nb_packets; ++offset) {
     607                                int ret;
     608                                ret = dispatch_packet(trace, t, &packets[offset], trace->tracetime);
     609                                if (ret == 0) {
     610                                        /* Move full slots to front as we go */
     611                                        if (packets[offset]) {
     612                                                if (empty != offset) {
     613                                                        packets[empty] = packets[offset];
     614                                                        packets[offset] = NULL;
     615                                                }
     616                                                ++empty;
     617                                        }
     618                                } else {
     619                                        assert(ret == READ_MESSAGE);
     620                                        /* Loop around and process the message, note */
     621                                        continue;
     622                                }
    543623                        }
    544624                } else {
    545                         nb_packets = trace_pread_packet(trace, t, packets, trace->config.burst_size);
    546                         // Loop through the packets we just read and refill
    547                         ret = dispatch_packets(trace, t, packets, nb_packets);
    548                         if (ret == -1)
    549                                 break;
    550                         else if (ret != nb_packets) {
    551                                 // Refill the empty packets
    552                                 //printf("Refilling packets ret=%d nb_packets=%zd\n", ret, nb_packets);
    553                                 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[ret], nb_packets - ret, nb_packets - ret);
     625                        switch (nb_packets) {
     626                        case READ_EOF:
     627                                fprintf(stderr, "EOF stop %d!!\n", nb_packets);
     628                                goto eof;
     629                        case READ_ERROR:
     630                                fprintf(stderr, "ERROR stop %d!!\n", nb_packets);
     631                                goto error;
     632                        case READ_MESSAGE:
     633                                nb_packets = 0;
     634                                continue;
     635                        default:
     636                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
     637                                goto error;
    554638                        }
    555639                }
    556         }
    557 
    558 
    559 stop:
     640
     641        }
     642
     643error:
     644        fprintf(stderr, "An error occured in trace\n");
     645eof:
     646        fprintf(stderr, "An eof occured in trace\n");
    560647        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
    561648
     
    610697        libtrace_packet_t * packet;
    611698        libtrace_message_t message = {0};
     699        int pkt_skipped = 0;
    612700
    613701        assert(trace_has_dedicated_hasher(trace));
     
    623711
    624712        printf("Hasher Thread started\n");
     713        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     714
    625715        if (trace->format->pregister_thread) {
    626716                trace->format->pregister_thread(trace, t, true);
    627717        }
    628         ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    629         int pkt_skipped = 0;
     718
    630719        /* Read all packets in then hash and queue against the correct thread */
    631720        while (1) {
     
    750839}
    751840
    752 /**
    753  * @brief Move NULLs to the end of an array.
    754  * @param values
    755  * @param len
    756  * @return The location the first NULL, aka the number of non NULL elements
    757  */
    758 static inline size_t move_nulls_back(void *arr[], size_t len) {
    759         size_t fr=0, en = len-1;
    760         // Shift all non NULL elements to the front of the array, and NULLs to the
    761         // end, traverses every element at most once
    762         for (;fr < en; ++fr) {
    763                 if (arr[fr] == NULL) {
    764                         for (;en > fr; --en) {
    765                                 if(arr[en]) {
    766                                         arr[fr] = arr[en];
    767                                         arr[en] = NULL;
    768                                         break;
    769                                 }
    770                         }
    771                 }
    772         }
    773         // This is the index of the first NULL
    774         en = MIN(fr, en);
    775         // Or the end of the array if this special case
    776         if (arr[en])
    777                 en++;
    778         return en;
    779 }
    780 
    781 /** returns the number of packets successfully allocated in the final array
    782  these will all be at the front of the array */
    783 inline static size_t fill_array_with_empty_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
    784         size_t nb;
    785         nb = move_nulls_back((void **) packets, nb_packets);
    786         mem_hits.read.recycled += nb;
    787         nb += libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[nb], nb_packets - nb, nb_packets - nb);
    788         assert(nb_packets == nb);
    789         return nb;
    790 }
    791 
    792 
    793 inline static size_t empty_array_of_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
    794         size_t nb;
    795         nb = move_nulls_back((void **) packets, nb_packets);
    796         mem_hits.write.recycled += nb_packets - nb;
    797         nb += nb_packets - libtrace_ocache_free(&libtrace->packet_freelist, (void **)packets, nb, nb);
    798         memset(packets, 0, nb); // XXX make better, maybe do this in ocache??
    799         return nb;
    800 }
    801 
    802841/* Our simplest case when a thread becomes ready it can obtain an exclusive
    803842 * lock to read packets from the underlying trace.
    804843 */
    805 inline static size_t trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
    806 {
     844static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
     845                                                    libtrace_thread_t *t,
     846                                                    libtrace_packet_t *packets[],
     847                                                    size_t nb_packets) {
    807848        size_t i = 0;
    808849        //bool tick_hit = false;
     
    812853        for (i = 0; i < nb_packets; ++i) {
    813854                packets[i]->error = trace_read_packet(libtrace, packets[i]);
     855
    814856                if (packets[i]->error <= 0) {
    815                         ++i;
    816                         break;
     857                        /* We'll catch this next time if we have already got packets */
     858                        if ( i==0 ) {
     859                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     860                                return packets[i]->error;
     861                        } else {
     862                                break;
     863                        }
    817864                }
    818865                /*
     
    842889 * 2. Move that into the packet provided (packet)
    843890 */
    844 inline static size_t trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets)
    845 {
     891inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
     892                                                      libtrace_thread_t *t,
     893                                                      libtrace_packet_t *packets[],
     894                                                      size_t nb_packets) {
    846895        size_t i;
     896
     897        /* We store the last error message here */
     898        if (t->format_data) {
     899                fprintf(stderr, "Hit me, ohh yeah got error %d\n",
     900                        ((libtrace_packet_t *)t->format_data)->error);
     901                return ((libtrace_packet_t *)t->format_data)->error;
     902        }
    847903
    848904        // Always grab at least one
     
    851907        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
    852908
    853         if (packets[0]->error < 0)
    854                 return 1;
     909        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
     910                fprintf(stderr, "Hit me, ohh yeah returning error %d\n", packets[0]->error);
     911                return packets[0]->error;
     912        }
    855913
    856914        for (i = 1; i < nb_packets; i++) {
     
    861919                        break;
    862920                }
    863                 // These are typically urgent
    864                 if (packets[i]->error < 0)
     921
     922                /* We will return an error or EOF the next time around */
     923                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
     924                        /* The message case will be checked automatically -
     925                           However other cases like EOF and error will only be
     926                           sent once*/
     927                        if (packets[i]->error != READ_MESSAGE) {
     928                                assert(t->format_data == NULL);
     929                                t->format_data = packets[i];
     930                                fprintf(stderr, "Hit me, ohh yeah set error %d\n",
     931                                        ((libtrace_packet_t *)t->format_data)->error);
     932                        }
    865933                        break;
     934                }
    866935        }
    867936
     
    12221291
    12231292/**
    1224  * Delays a packets playback so the playback will be in trace time
    1225  */
    1226 static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
     1293 * Delays a packets playback so the playback will be in trace time.
     1294 * This may break early if a message becomes available.
     1295 *
     1296 * Requires the first packet for this thread to be received.
     1297 * @param libtrace  The trace
     1298 * @param packet    The packet to delay
     1299 * @param t         The current thread
     1300 * @return Either READ_MESSAGE(-2) or 0 is successful
     1301 */
     1302static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
    12271303        struct timeval curr_tv, pkt_tv;
    1228         uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
     1304        uint64_t next_release = t->tracetime_offset_usec;
    12291305        uint64_t curr_usec;
    1230         /* Tracetime we might delay releasing this packet */
     1306
    12311307        if (!t->tracetime_offset_usec) {
    1232                 libtrace_packet_t * first_pkt;
     1308                libtrace_packet_t *first_pkt;
    12331309                struct timeval *sys_tv;
    12341310                int64_t initial_offset;
     
    12371313                pkt_tv = trace_get_timeval(first_pkt);
    12381314                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
     1315                /* In the unlikely case offset is 0, change it to 1 */
    12391316                if (stable)
    1240                         // 0->1 because 0 is used to mean unset
    12411317                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
    12421318                next_release = initial_offset;
     
    12481324        curr_usec = tv_to_usec(&curr_tv);
    12491325        if (next_release > curr_usec) {
     1326                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
     1327                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
     1328                fd_set rfds;
     1329                FD_ZERO(&rfds);
     1330                FD_SET(mesg_fd, &rfds);
    12501331                // We need to wait
    1251                 struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
     1332
    12521333                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
    1253                 select(0, NULL, NULL, NULL, &delay_tv);
    1254         }
    1255 }
    1256 
    1257 /* Read one packet from the trace into a buffer. Note that this function will
    1258  * block until a packet is read (or EOF is reached).
    1259  *
    1260  * @param libtrace      the trace
    1261  * @param t     The thread
    1262  * @param packets       an array of packets
    1263  * @param nb_packets
    1264  * @returns The number of packets read or 0 on EOF, negative value on error
    1265  *
    1266  * Note this is identical to read_packet but calls pread_packet instead of
    1267  * read packet in the format.
    1268  *
    1269  */
    1270 static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
     1334                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
     1335                if (ret == 0) {
     1336                        return 0;
     1337                } else if (ret > 0) {
     1338                        return READ_MESSAGE;
     1339                } else {
     1340                        fprintf(stderr, "I thnik we broke select\n");
     1341                }
     1342        }
     1343        return 0;
     1344}
     1345
     1346/* Discards packets that don't match the filter.
     1347 * Discarded packets are emptied and then moved to the end of the packet list.
     1348 *
     1349 * @param trace       The trace format, containing the filter
     1350 * @param packets     An array of packets
     1351 * @param nb_packets  The number of valid items in packets
     1352 *
     1353 * @return The number of packets that passed the filter, which are moved to
     1354 *          the start of the packets array
     1355 */
     1356static inline size_t filter_packets(libtrace_t *trace,
     1357                                    libtrace_packet_t **packets,
     1358                                    size_t nb_packets) {
     1359        size_t offset = 0;
    12711360        size_t i;
     1361
     1362        for (i = 0; i < nb_packets; ++i) {
     1363                // The filter needs the trace attached to receive the link type
     1364                packets[i]->trace = trace;
     1365                if (trace_apply_filter(trace->filter, packets[i])) {
     1366                        libtrace_packet_t *tmp;
     1367                        tmp = packets[offset];
     1368                        packets[offset++] = packets[i];
     1369                        packets[i] = tmp;
     1370                } else {
     1371                        trace_fin_packet(packets[i]);
     1372                }
     1373        }
     1374
     1375        return offset;
     1376}
     1377
     1378/* Read a batch of packets from the trace into a buffer.
     1379 * Note that this function will block until a packet is read (or EOF is reached)
     1380 *
     1381 * @param libtrace    The trace
     1382 * @param t           The thread
     1383 * @param packets     An array of packets
     1384 * @param nb_packets  The number of empty packets in packets
     1385 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
     1386 */
     1387static int trace_pread_packet_wrapper(libtrace_t *libtrace,
     1388                                      libtrace_thread_t *t,
     1389                                      libtrace_packet_t *packets[],
     1390                                      size_t nb_packets) {
     1391        int i;
    12721392        assert(nb_packets);
    1273         assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
     1393        assert(libtrace && "libtrace is NULL in trace_read_packet()");
    12741394        if (trace_is_err(libtrace))
    12751395                return -1;
    12761396        if (!libtrace->started) {
    1277                 trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
     1397                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
     1398                              "You must call libtrace_start() before trace_read_packet()\n");
    12781399                return -1;
    12791400        }
    12801401
    1281 
    12821402        if (libtrace->format->pread_packets) {
    1283                 for (i = 0; i < nb_packets; ++i) {
    1284                         assert(packets[i]);
    1285                         if (!(packets[i]->buf_control==TRACE_CTRL_PACKET || packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
    1286                                 trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
     1403                int ret;
     1404                for (i = 0; i < (int) nb_packets; ++i) {
     1405                        assert(i[packets]);
     1406                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
     1407                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
     1408                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
     1409                                              "Packet passed to trace_read_packet() is invalid\n");
    12871410                                return -1;
    12881411                        }
    1289                         /* Finalise the packets, freeing any resources the format module
    1290                          * may have allocated it and zeroing all data associated with it.
    1291                          */
    1292                         //trace_fin_packet(packets[i]);
    1293                         /* Store the trace we are reading from into the packet opaque
    1294                          * structure */
    1295                         packets[i]->trace = libtrace;
    12961412                }
    12971413                do {
    1298                         int ret;
    1299                         ret=libtrace->format->pread_packets(libtrace, t, packets, nb_packets);
     1414                        ret=libtrace->format->pread_packets(libtrace, t,
     1415                                                            packets,
     1416                                                            nb_packets);
     1417                        /* Error, EOF or message? */
    13001418                        if (ret <= 0) {
    13011419                                return ret;
    13021420                        }
     1421
    13031422                        if (libtrace->filter) {
    1304                                 /*
    1305                                  * Discard packets that don't match the filter
    1306                                  * If that is all of the packets then pread again
    1307                                  */
    1308                                 int nb_filtered = 0;
    1309                                 libtrace_packet_t *filtered_pkts[ret];
    1310                                 int offset;
    1311                                 for (i = 0; i < ret; ++i) {
    1312                                         if (!trace_apply_filter(libtrace->filter, packets[i])){
    1313                                                 trace_fin_packet(packets[i]);
    1314                                                 packets[i]->trace = libtrace;
    1315                                                 filtered_pkts[nb_filtered++] = packets[i];
    1316                                                 packets[i] = NULL;
    1317                                         } else {
    1318                                                 if (libtrace->snaplen>0)
    1319                                                         /* Snap the packet */
    1320                                                         trace_set_capture_length(packets[i],
    1321                                                                         libtrace->snaplen);
    1322                                                 trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
    1323                                         }
    1324                                 }
    1325                                 // TODO this aint thread safe
    1326                                 libtrace->filtered_packets += nb_filtered;
    1327                                 for (i = 0, offset = 0; i < ret; ++i) {
    1328                                         if (packets[i])
    1329                                                 packets[offset++] = packets[i];
    1330                                 }
    1331                                 assert (ret - offset == nb_filtered);
    1332                                 memcpy(&packets[offset], filtered_pkts, nb_filtered * sizeof(libtrace_packet_t *));
    1333                                 t->accepted_packets -= nb_filtered;
    1334                         } else {
    1335                                 for (i = 0; i < ret; ++i) {
    1336                                         if (libtrace->snaplen>0)
    1337                                                 trace_set_capture_length(packets[i],
    1338                                                                 libtrace->snaplen);
    1339                                         trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
    1340                                 }
     1423                                int remaining;
     1424                                remaining = filter_packets(libtrace,
     1425                                                           packets, ret);
     1426                                t->filtered_packets += ret - remaining;
     1427                                ret = remaining;
     1428                        }
     1429                        for (i = 0; i < ret; ++i) {
     1430                                packets[i]->trace = libtrace;
     1431                                /* TODO IN FORMAT?? Like traditional libtrace */
     1432                                if (libtrace->snaplen>0)
     1433                                        trace_set_capture_length(packets[i],
     1434                                                        libtrace->snaplen);
     1435                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
    13411436                        }
    13421437                        t->accepted_packets += ret;
    1343                         //++libtrace->accepted_packets;
    1344                         return ret;
    1345                 } while(1);
    1346         }
    1347         trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
     1438                } while(ret == 0);
     1439                return ret;
     1440        }
     1441        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
     1442                      "This format does not support reading packets\n");
    13481443        return ~0U;
    1349 }
    1350 
    1351 /**
    1352  * Selects the correct source for packets, either a parallel source
    1353  * or internal splitting
    1354  *
    1355  * @param libtrace
    1356  * @param t
    1357  * @param packets An array pre-filled with empty finilised packets
    1358  * @param nb_packets The number of packets in the array
    1359  *
    1360  * @return the number of packets read, null packets indicate messages. Check packet->error before
    1361  * assuming a packet is valid.
    1362  */
    1363 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
    1364                                  libtrace_packet_t *packets[], size_t nb_packets)
    1365 {
    1366         size_t ret;
    1367         size_t i;
    1368         assert(nb_packets);
    1369 
    1370         if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    1371                 ret = trace_pread_packet_wrapper(libtrace, t, packets, nb_packets);
    1372                 /* Put the error into the first packet */
    1373                 if ((int) ret <= 0) {
    1374                         packets[0]->error = ret;
    1375                         ret = 1;
    1376                 }
    1377         } else if (trace_has_dedicated_hasher(libtrace)) {
    1378                 ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets);
    1379         } else if (!trace_has_dedicated_hasher(libtrace)) {
    1380                 /* We don't care about which core a packet goes to */
    1381                 ret = trace_pread_packet_first_in_first_served(libtrace, t, packets, nb_packets);
    1382         } /* else {
    1383                 ret = trace_pread_packet_hash_locked(libtrace, packet);
    1384         }*/
    1385 
    1386         // Formats can also optionally do this internally to ensure the first
    1387         // packet is always reported correctly
    1388         assert(ret);
    1389         assert(ret <= nb_packets);
    1390         if (packets[0]->error > 0) {
    1391                 store_first_packet(libtrace, packets[0], t);
    1392                 if (libtrace->tracetime)
    1393                         delay_tracetime(libtrace, packets[0], t);
    1394         }
    1395 
    1396         return ret;
    13971444}
    13981445
     
    15881635                printf("This format has direct support for p's\n");
    15891636                ret = libtrace->format->pstart_input(libtrace);
     1637                libtrace->pread = trace_pread_packet_wrapper;
    15901638        } else {
    15911639                if (libtrace->format->start_input) {
    15921640                        ret = libtrace->format->start_input(libtrace);
    15931641                }
     1642                if (libtrace->perpkt_thread_count > 1)
     1643                        libtrace->pread = trace_pread_packet_first_in_first_served;
     1644                else
     1645                        libtrace->pread = NULL;
    15941646        }
    15951647
     
    16161668                        goto cleanup_started;
    16171669                }
     1670                libtrace->pread = trace_pread_packet_hasher_thread;
    16181671        } else {
    16191672                libtrace->hasher_thread.type = THREAD_EMPTY;
Note: See TracChangeset for help on using the changeset viewer.