Changeset f2066fa


Ignore:
Timestamp:
08/25/15 17:40:12 (5 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
2fa43fa
Parents:
03aca91
Message:

Fix #5 make trace_pstart fallback to the single threaded format

If starting a parallel format fails we now retry as a single threaded format.
This fixes ring/int on older (pre 3.1 kernels) machines without PACKET_FANOUT.
This behaviour can be detected using trace_is_parallel()

Location:
lib
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • lib/format_linux_common.c

    r773a2a3 rf2066fa  
    661661                        linuxcommon_close_input_stream(libtrace, stream);
    662662                }
    663                 libtrace_list_deinit(FORMAT_DATA->per_stream);
    664                 free(libtrace->format_data);
    665                 libtrace->format_data = NULL;
    666663                return -1;
    667664        }
  • lib/format_linux_int.c

    re99c493 rf2066fa  
    6969{
    7070        int ret = linuxcommon_start_input_stream(libtrace, FORMAT_DATA_FIRST);
    71         if (ret != 0) {
    72                 libtrace_list_deinit(FORMAT_DATA->per_stream);
    73                 free(libtrace->format_data);
    74                 libtrace->format_data = NULL;
    75         }
    7671        return ret;
    7772}
  • lib/format_linux_ring.c

    r9d89626 rf2066fa  
    280280{
    281281        int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
    282         if (ret != 0) {
    283                 libtrace_list_deinit(FORMAT_DATA->per_stream);
    284                 free(libtrace->format_data);
    285                 libtrace->format_data = NULL;
    286         }
    287282        return ret;
    288283}
  • lib/libtrace_int.h

    r4007dbb rf2066fa  
    332332        /** User defined per_msg function called when a message is ready */
    333333        fn_cb_msg per_msg;
    334         /** User defined reporter function entry point XXX not hooked up */
     334        /** User defined reporter function entry point */
    335335        fn_reporter reporter;
    336336        /** The hasher function */
  • lib/libtrace_parallel.h

    r4007dbb rf2066fa  
    537537 *
    538538 * @param libtrace The input trace to start
    539  * @param message The message to intercept
    540539 * @param handler the handler to be called when the message is received
    541540 * @return 0 if successful otherwise -1.
     
    627626/** Store data against a thread.
    628627 *
    629  * @param The parallel trace.
    630  * @param data The new value to save against the trace
     628 * @param thread The thread
     629 * @param data The new value to save against the thread
    631630 * @return The previously stored value
    632631 *
     
    713712 * as possible (real-time).
    714713 *
    715  * @param A parallel input trace
     714 * @param trace A parallel input trace
    716715 * @param tracetime If true packets are released with time intervals matching
    717716 * the original trace. Otherwise packets are read as fast as possible.
     
    895894         *
    896895         * @param key (Typically) The packets order, see trace_packet_get_order()
    897          * @param
    898896         */
    899897        RESULT_PACKET,
     
    10511049 */
    10521050DLLEXPORT bool trace_has_finished(libtrace_t * libtrace);
     1051
     1052
     1053/** Check if libtrace is directly reading from multiple queues
     1054 * from the format (such as a NICs hardware queues).
     1055 *
     1056 * When a parallel trace is running, or if checked after its completion
     1057 * this returns true if a trace was able to run natively parallel
     1058 * from the format. Otherwise false is returned, meaning libtrace is
     1059 * distibuting packets across multiple threads from a single source.
     1060 *
     1061 * Factors that may stop this happening despite the format supporting
     1062 * native parallel reads include: the choice of hasher function,
     1063 * the number of threads choosen (such as 1 or more than the trace supports)
     1064 * or another error when trying to start the parallel format.
     1065 *
     1066 * If this is called before the trace is started. I.e. before pstart
     1067 * this returns an indication that the trace has the possiblity to support
     1068 * native parallel reads. After trace pstart is called this should be
     1069 * checked again to confirm this has happened.
     1070 *
     1071 *
     1072 * @return true if the trace is parallel or false if the library is splitting
     1073 * the trace into multiple threads.
     1074 */
     1075DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace);
    10531076
    10541077/** Returns either the sequence number or erf timestamp of a packet.
  • lib/trace_parallel.c

    reea427f rf2066fa  
    365365                pthread_t tid = pthread_self();
    366366                // Check if we are reporter or something else
    367                 if (pthread_equal(tid, libtrace->reporter_thread.tid))
     367                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
     368                                pthread_equal(tid, libtrace->reporter_thread.tid))
    368369                        ret = &libtrace->reporter_thread;
    369                 else if (pthread_equal(tid, libtrace->hasher_thread.tid))
     370                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
     371                         pthread_equal(tid, libtrace->hasher_thread.tid))
    370372                        ret = &libtrace->hasher_thread;
    371373                else
     
    589591
    590592        if (trace->format->pregister_thread) {
    591                 trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
     593                trace->format->pregister_thread(trace, t, trace_is_parallel(trace));
    592594        }
    593595
     
    13641366                libtrace->global_blob = global_blob;
    13651367
    1366         if (libtrace->perpkt_thread_count > 1 &&
    1367             trace_supports_parallel(libtrace) &&
    1368             !trace_has_dedicated_hasher(libtrace)) {
     1368        if (trace_is_parallel(libtrace)) {
    13691369                err = libtrace->format->pstart_input(libtrace);
    13701370        } else {
     
    15791579}
    15801580
     1581DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
     1582        if (libtrace->state == STATE_NEW)
     1583                return trace_supports_parallel(libtrace);
     1584        return libtrace->pread == trace_pread_packet_wrapper;
     1585}
     1586
    15811587DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    15821588                           fn_cb_msg per_msg, fn_reporter reporter) {
     
    16261632        verify_configuration(libtrace);
    16271633
     1634        ret = -1;
    16281635        /* Try start the format - we prefer parallel over single threaded, as
    16291636         * these formats should support messages better */
     
    16321639                ret = libtrace->format->pstart_input(libtrace);
    16331640                libtrace->pread = trace_pread_packet_wrapper;
    1634         } else {
     1641        }
     1642        if (ret != 0) {
    16351643                if (libtrace->format->start_input) {
    16361644                        ret = libtrace->format->start_input(libtrace);
     
    17701778        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
    17711779cleanup_started:
    1772         if (trace_supports_parallel(libtrace) &&
    1773             !trace_has_dedicated_hasher(libtrace)
    1774             && libtrace->perpkt_thread_count > 1) {
     1780        if (libtrace->pread == trace_pread_packet_wrapper) {
    17751781                if (libtrace->format->ppause_input)
    17761782                        libtrace->format->ppause_input(libtrace);
     
    19331939        // Save the statistics against the trace
    19341940        trace_get_statistics(libtrace, NULL);
    1935         if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {
     1941        if (trace_is_parallel(libtrace)) {
    19361942                libtrace->started = false;
    19371943                if (libtrace->format->ppause_input)
     
    23552361        return 0;
    23562362}
    2357 
    2358 
    23592363
    23602364static bool config_bool_parse(char *value, size_t nvalue) {
Note: See TracChangeset for help on using the changeset viewer.