Ignore:
Timestamp:
09/16/14 02:35:10 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
d994324
Parents:
50b1bee
Message:

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    rf0e8bd6 r2498008  
    9595#include "rt_protocol.h"
    9696#include "hash_toeplitz.h"
     97#include "combiners.h"
    9798
    9899#include <pthread.h>
     
    185186        assert(libtrace->state != STATE_NEW);
    186187        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
     188}
     189
     190/**
     191 * When running the number of perpkt threads in use.
     192 * TODO what if the trace is not running yet, or has finished??
     193 *
     194 * @brief libtrace_perpkt_thread_nb
     195 * @param t The trace
     196 * @return
     197 */
     198DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
     199        return t->perpkt_thread_count;
    187200}
    188201
     
    283296        t->type = THREAD_EMPTY;
    284297        libtrace_zero_ringbuffer(&t->rbuffer);
    285         libtrace_zero_vector(&t->vector);
    286         libtrace_zero_deque(&t->deque);
    287298        t->recorded_first = false;
    288299        t->perpkt_num = -1;
     
    328339}
    329340
    330 /** Used below in trace_make_results_packets_safe*/
     341/** Used below in trace_make_results_packets_safe */
    331342static void do_copy_result_packet(void *data)
    332343{
     
    344355
    345356/**
    346  * Make a safe replacement copy of any result packets that are owned
    347  * by the format in the result queue. Used when pausing traces.
    348  */
    349 static void trace_make_results_packets_safe(libtrace_t *trace) {
    350         libtrace_thread_t *t = get_thread_descriptor(trace);
    351         if (trace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
    352                 libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
    353         else
    354                 libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
    355 }
    356 
    357 /**
    358357 * Holds threads in a paused state, until released by broadcasting
    359358 * the condition mutex.
    360359 */
    361360static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
    362         if (t->type == THREAD_PERPKT)
    363                 trace_make_results_packets_safe(trace);
    364361        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    365362        thread_change_state(trace, t, THREAD_PAUSED, false);
     
    11741171        libtrace_t *trace = (libtrace_t *)data;
    11751172        libtrace_thread_t *t = &trace->reporter_thread;
    1176         size_t res_size;
    11771173        libtrace_vector_t results;
    11781174        libtrace_vector_init(&results, sizeof(libtrace_result_t));
    11791175        fprintf(stderr, "Reporter thread starting\n");
    1180         libtrace_result_t result;
    1181         size_t i;
    11821176
    11831177        message.code = MESSAGE_STARTING;
     
    11971191                        // Check for results
    11981192                        case MESSAGE_POST_REPORTER:
    1199                                 res_size = trace_get_results(trace, &results);
    1200                                 for (i = 0; i < res_size; i++) {
    1201                                         ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
    1202                                         (*trace->reporter)(trace, &result, NULL);
    1203                                 }
     1193                                trace->combiner.read(trace, &trace->combiner);
    12041194                                break;
    12051195                        case MESSAGE_DO_PAUSE:
     1196                                assert(trace->combiner.pause);
     1197                                trace->combiner.pause(trace, &trace->combiner);
    12061198                                message.code = MESSAGE_PAUSING;
    12071199                                message.sender = t;
     
    12171209
    12181210        // Flush out whats left now all our threads have finished
    1219         res_size = trace_get_results(trace, &results);
    1220         for (i = 0; i < res_size; i++) {
    1221                 ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
    1222                 (*trace->reporter)(trace, &result, NULL);
    1223         }
    1224         libtrace_vector_destroy(&results);
     1211        trace->combiner.read_final(trace, &trace->combiner);
    12251212
    12261213        // GOODBYE
     
    15911578                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size,
    15921579                                                 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0);
    1593                 // Depending on the mode vector or deque might be chosen
    1594                 libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
    1595                 libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
    15961580                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    15971581                t->recorded_first = false;
     
    16111595        if (threads_started == 0)
    16121596                threads_started = trace_start_perpkt_threads(libtrace);
     1597
     1598        // No combiner set, use a default to reduce the chance of this breaking
     1599        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
     1600                libtrace->combiner = combiner_unordered;
     1601
     1602        if (libtrace->combiner.initialise)
     1603                libtrace->combiner.initialise(libtrace, &libtrace->combiner);
    16131604
    16141605        libtrace->reporter_thread.type = THREAD_REPORTER;
     
    20582049
    20592050/**
    2060  * Publish to the reduce queue, return
     2051 * Publishes a result to the reduce queue
    20612052 * Should only be called by a perpkt thread, i.e. from a perpkt handler
    20622053 */
    20632054DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type) {
    20642055        libtrace_result_t res;
    2065         UNUSED static __thread int count = 0;
    20662056        res.type = type;
    2067 
    2068         libtrace_result_set_key_value(&res, key, value);
    2069 
    2070         /*
    2071         if (count == 1)
    2072                 printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
    2073         count = (count+1) %1000;
    2074         libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
    2075         */
    2076         /*if (count == 1)
    2077                 printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
    2078         count = (count+1)%1000;*/
    2079 
    2080         if (libtrace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    2081                 if (libtrace_deque_get_size(&t->deque) >= libtrace->config.reporter_thold) {
    2082                         trace_post_reporter(libtrace);
    2083                 }
    2084                 //while (libtrace_deque_get_size(&t->deque) >= 1000)
    2085                 //      sched_yield();
    2086                 libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
    2087         } else {
    2088                 //while (libtrace_vector_get_size(&t->vector) >= 1000)
    2089                 //      sched_yield();
    2090 
    2091                 if (libtrace_vector_get_size(&t->vector) >= libtrace->config.reporter_thold) {
    2092                         trace_post_reporter(libtrace);
    2093                 }
    2094                 libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
    2095         }
    2096 }
    2097 
    2098 static int compareres(const void* p1, const void* p2)
    2099 {
    2100         if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
    2101                 return -1;
    2102         if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
    2103                 return 0;
    2104         else
    2105                 return 1;
    2106 }
    2107 
    2108 DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
    2109         int i;
    2110         int flags = libtrace->reporter_flags; // Hint these aren't a changing
    2111 
    2112         libtrace_vector_empty(results);
    2113 
    2114         /* Here we assume queues are in order ascending order and they want
    2115          * the smallest result first. If they are not in order the results
    2116          * may not be in order.
    2117          */
    2118         if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    2119                 int live_count = 0;
    2120                 bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
    2121                 uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
    2122                 uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
    2123                 int min_queue = -1;
    2124 
    2125                 /* Loop through check all are alive (have data) and find the smallest */
    2126                 for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    2127                         libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
    2128                         if (libtrace_deque_get_size(v) != 0) {
    2129                                 libtrace_result_t r;
    2130                                 libtrace_deque_peek_front(v, (void *) &r);
    2131                                 live_count++;
    2132                                 live[i] = 1;
    2133                                 key[i] = libtrace_result_get_key(&r);
    2134                                 if (i==0 || min_key > key[i]) {
    2135                                         min_key = key[i];
    2136                                         min_queue = i;
    2137                                 }
    2138                         } else {
    2139                                 live[i] = 0;
    2140                         }
    2141                 }
    2142 
    2143                 /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
    2144                 while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
    2145                                 ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
    2146                                 trace_finished(libtrace)))) {
    2147                         /* Get the minimum queue and then do stuff */
    2148                         libtrace_result_t r;
    2149 
    2150                         assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
    2151                         libtrace_vector_push_back(results, &r);
    2152 
    2153                         // We expect the key we read +1 now
    2154                         libtrace->expected_key = key[min_queue] + 1;
    2155 
    2156                         // Now update the one we just removed
    2157                         if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
    2158                         {
    2159                                 libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
    2160                                 key[min_queue] = libtrace_result_get_key(&r);
    2161                                 if (key[min_queue] <= min_key) {
    2162                                         // We are still the smallest, might be out of order though :(
    2163                                         min_key = key[min_queue];
    2164                                 } else {
    2165                                         min_key = key[min_queue]; // Update our minimum
    2166                                         // Check all find the smallest again - all are alive
    2167                                         for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    2168                                                 if (live[i] && min_key > key[i]) {
    2169                                                         min_key = key[i];
    2170                                                         min_queue = i;
    2171                                                 }
    2172                                         }
    2173                                 }
    2174                         } else {
    2175                                 live[min_queue] = 0;
    2176                                 live_count--;
    2177                                 min_key = UINT64_MAX; // Update our minimum
    2178                                 // Check all find the smallest again - all are alive
    2179                                 for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    2180                                         // Still not 100% TODO (what if order is wrong or not increasing)
    2181                                         if (live[i] && min_key >= key[i]) {
    2182                                                 min_key = key[i];
    2183                                                 min_queue = i;
    2184                                         }
    2185                                 }
    2186                         }
    2187                 }
    2188         } else { // Queues are not in order - return all results in the queue
    2189                 for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    2190                         libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
    2191                 }
    2192                 if (flags & REDUCE_SORT) {
    2193                         qsort(results->elements, results->size, results->element_size, &compareres);
    2194                 }
    2195         }
    2196         return libtrace_vector_get_size(results);
     2057        res.key = key;
     2058        res.value = value;
     2059        assert(libtrace->combiner.publish);
     2060        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
     2061        return;
    21972062}
    21982063
     
    22372102                        return 1;
    22382103                case TRACE_OPTION_SEQUENTIAL:
     2104                        libtrace->combiner = combiner_ordered;
    22392105                        if (*((int *) value))
    22402106                                libtrace->reporter_flags |= REDUCE_SEQUENTIAL;
     
    22432109                        return 1;
    22442110                case TRACE_OPTION_ORDERED:
     2111                        libtrace->combiner = combiner_ordered;
    22452112                        if (*((int *) value))
    22462113                                libtrace->reporter_flags |= REDUCE_ORDERED;
Note: See TracChangeset for help on using the changeset viewer.