Changeset 3296252


Ignore:
Timestamp:
06/26/14 00:36:08 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
c99b1e5
Parents:
59ef093
Message:

Fixes pausing the trace in the edge case that some threads have already ended.
Tidy up the state system, for both threads and the the overall state.
General tidies to the code.

Location:
lib
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/libtrace_int.h

    r9594cf9 r3296252  
    184184        THREAD_FINISHING,
    185185        THREAD_FINISHED,
    186         THREAD_PAUSED
     186        THREAD_PAUSED,
     187        THREAD_STATE_MAX
    187188};
    188189
     
    301302        /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */
    302303        pthread_cond_t perpkt_cond;
    303         /** Set to the number of perpkt threads that are finishing (or have finished), or to -1 once all have been joined, 0 implies all are running */
    304         int perpkts_finishing;
    305         /** A count of perpkt threads that are pausing */
    306         int perpkts_pausing;
     304        /* Keep track of counts of threads in any given state */
     305        int perpkt_thread_states[THREAD_STATE_MAX];
    307306
    308307        /** For the sliding window hasher implementation */
  • lib/trace.c

    r9857d1c r3296252  
    264264        // libtrace->perpkt_cond;
    265265        libtrace->state = STATE_NEW;
    266         libtrace->perpkts_pausing = 0;
    267266        libtrace->perpkt_queue_full = false;
    268         libtrace->perpkts_finishing = -1;
    269267        libtrace->reducer_flags = 0;
    270268        libtrace->global_blob = NULL;
     
    385383        // libtrace->perpkt_cond;
    386384        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
    387         libtrace->perpkts_pausing = 0;
    388385        libtrace->perpkt_queue_full = false;
    389         libtrace->perpkts_finishing = -1;
    390386        libtrace->reducer_flags = 0;
    391387        libtrace->global_blob = NULL;
  • lib/trace_parallel.c

    r59ef093 r3296252  
    100100#include <unistd.h>
    101101
     102
     103#define VERBOSE_DEBBUGING 0
     104
    102105extern int libtrace_parallel;
    103106
     
    107110} contention_stats[1024];
    108111
    109 inline int trace_has_dedicated_hasher(libtrace_t * libtrace);
     112/**
     113 * True if the trace has dedicated hasher thread otherwise false,
     114 * to be used after the trace is running
     115 */
     116static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
     117{
     118        assert(libtrace->state != STATE_NEW);
     119        return libtrace->hasher_thread.type == THREAD_HASHER;
     120}
     121
     122/**
     123 * Changes a thread's state and broadcasts the condition variable. This
     124 * should always be done when the lock is held.
     125 *
     126 * Additionally for perpkt threads the state counts are updated.
     127 *
     128 * @param trace A pointer to the trace
     129 * @param t A pointer to the thread to modify
     130 * @param new_state The new state of the thread
     131 * @param need_lock Set to true if libtrace_lock is not held, otherwise
     132 *        false in the case the lock is currently held by this thread.
     133 */
     134static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
     135        const enum thread_states new_state, const bool need_lock)
     136{
     137        enum thread_states prev_state;
     138        if (need_lock)
     139                pthread_mutex_lock(&trace->libtrace_lock);
     140        prev_state = t->state;
     141        t->state = new_state;
     142        if (t->type == THREAD_PERPKT) {
     143                --trace->perpkt_thread_states[prev_state];
     144                ++trace->perpkt_thread_states[new_state];
     145        }
     146
     147#if VERBOSE_DEBBUGING
     148        fprintf(stderr, "Thread %d State changed from %d to %d\n", t->tid,
     149                t->state, prev_state);
     150#endif
     151        if (need_lock)
     152                pthread_mutex_unlock(&trace->libtrace_lock);
     153        pthread_cond_broadcast(&trace->perpkt_cond);
     154}
     155
     156/**
     157 * Changes the overall traces state and signals the condition.
     158 *
     159 * @param trace A pointer to the trace
     160 * @param new_state The new state of the trace
     161 * @param need_lock Set to true if libtrace_lock is not held, otherwise
     162 *        false in the case the lock is currently held by this thread.
     163 */
     164static inline void libtrace_change_state(libtrace_t *trace,
     165        const enum trace_state new_state, const bool need_lock)
     166{
     167        enum trace_state prev_state;
     168        if (need_lock)
     169                pthread_mutex_lock(&trace->libtrace_lock);
     170        prev_state = trace->state;
     171        trace->state = new_state;
     172#if VERBOSE_DEBBUGING
     173        fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
     174                trace->uridata, get_trace_state_name(trace->state),
     175                get_trace_state_name(prev_state));
     176#endif
     177        if (need_lock)
     178                pthread_mutex_unlock(&trace->libtrace_lock);
     179        pthread_cond_broadcast(&trace->perpkt_cond);
     180}
    110181
    111182/**
     
    120191        else
    121192                return false;
    122         //return trace->format->pstart_input;
    123193}
    124194
     
    221291 * the condition mutex.
    222292 */
    223 static void trace_thread_pause(libtrace_t *trace) {
    224         //printf("Pausing thread #%d\n", get_thread_table_num(trace));
     293static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
    225294        trace_make_results_packets_safe(trace);
    226295        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
    227         trace->perpkts_pausing++;
    228         pthread_cond_broadcast(&trace->perpkt_cond);
     296        thread_change_state(trace, t, THREAD_PAUSED, false);
    229297        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
    230298                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
    231299        }
    232         trace->perpkts_pausing--;
    233         pthread_cond_broadcast(&trace->perpkt_cond);
     300        thread_change_state(trace, t, THREAD_RUNNING, false);
    234301        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    235         //printf("Releasing thread #%d\n", get_thread_table_num(trace));
    236302}
    237303
     
    245311        libtrace_packet_t *packet = NULL;
    246312
     313
     314        // Force this thread to wait until trace_pstart has been completed
    247315        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
    248316        t = get_thread_table(trace);
     
    287355                                        (*trace->per_pkt)(trace, NULL, &message, t);
    288356                                        // Now we do the actual pause, this returns when we are done
    289                                         trace_thread_pause(trace);
     357                                        trace_thread_pause(trace, t);
    290358                                        // Check for new messages as soon as we return
    291359                                        continue;
     
    337405                trace_destroy_packet(packet);
    338406
    339         // And we're at the end free the memories
    340         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
    341         t->state = THREAD_FINISHED;
    342         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     407       
     408        thread_change_state(trace, t, THREAD_FINISHED, true);
    343409
    344410        // Notify only after we've defiantly set the state to finished
     
    349415        pthread_exit(NULL);
    350416};
    351 
    352 /** True if trace has dedicated hasher thread otherwise false */
    353 inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
    354 {
    355         return libtrace->hasher_thread.type == THREAD_HASHER;
    356 }
    357417
    358418/**
     
    390450                        switch(message.code) {
    391451                                case MESSAGE_DO_PAUSE:
    392                                         fprintf(stderr, "Pausing hasher thread\n");
    393452                                        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
    394                                         trace->perpkts_pausing++;
     453                                        thread_change_state(trace, t, THREAD_PAUSED, false);
    395454                                        pthread_cond_broadcast(&trace->perpkt_cond);
    396455                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
    397456                                                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
    398457                                        }
    399                                         // trace->perpkts_pausing--; // Don't do this the pausing thread will do this for us
     458                                        thread_change_state(trace, t, THREAD_RUNNING, false);
    400459                                        pthread_cond_broadcast(&trace->perpkt_cond);
    401460                                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    402                                         fprintf(stdout, "Mapper resuming\n");
    403461                                        break;
    404462                                case MESSAGE_DO_STOP:
     
    448506                }
    449507        }
    450         // We dont need to free packet
    451 
    452         // And we're at the end free the memories
    453         t->state = THREAD_FINISHED;
     508
     509        // We don't need to free the packet
     510        thread_change_state(trace, t, THREAD_FINISHED, true);
    454511
    455512        // Notify only after we've defiantly set the state to finished
     
    479536}
    480537
    481 /* Our simplest case when a thread becomes ready it can obtain a exclusive
     538/* Our simplest case when a thread becomes ready it can obtain an exclusive
    482539 * lock to read a packet from the underlying trace.
    483540 */
     
    568625
    569626                // Check before
    570                 if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
     627                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
    571628                        complete = true;
    572629                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    577634
    578635                // Check after
    579                 if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
     636                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
    580637                        complete = true;
    581638                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    602659inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
    603660{
    604         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    605         t->state = THREAD_FINISHING;
    606         libtrace->perpkts_finishing++;
    607         pthread_cond_broadcast(&libtrace->perpkt_cond);
    608         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     661        thread_change_state(libtrace, t, THREAD_FINISHING, true);
    609662        return trace_handle_finishing_perpkt(libtrace, packet, t);
    610663}
     
    690743/**
    691744 * This case is much like the dedicated hasher, except that we will become
    692  * hasher if we don't have a a packet waiting.
    693  *
    694  * TODO: You can loose the tail of a trace if the final thread
     745 * hasher if we don't have a packet waiting.
     746 *
     747 * TODO: You can lose the tail of a trace if the final thread
    695748 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
    696749 *
     
    945998        }
    946999done:
    947         fprintf(stderr, "keepalive thread is finishing\n");
    948         trace->keepalive_thread.state = THREAD_FINISHED;
     1000
     1001        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
    9491002        return NULL;
    9501003}
     
    10961149}
    10971150
    1098 /* Start an input trace in a parallel fashion.
     1151/* Start an input trace in a parallel fashion, or restart a paused trace.
     1152 *
     1153 * NOTE: libtrace lock is held for the majority of this function
    10991154 *
    11001155 * @param libtrace the input trace to start
     
    11061161        int i;
    11071162        sigset_t sig_before, sig_block_all;
    1108 
    11091163        assert(libtrace);
    1110         if (trace_is_err(libtrace))
     1164        if (trace_is_err(libtrace)) {
    11111165                return -1;
    1112         if (libtrace->state == STATE_PAUSED) {
    1113                 assert (libtrace->perpkts_pausing != 0);
    1114                 fprintf(stderr, "Restarting trace\n");
    1115                 UNUSED int err;
     1166        }
     1167        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
     1168        if (libtrace->state != STATE_NEW) {
     1169                int err = 0;
     1170                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1171                if (libtrace->state != STATE_PAUSED) {
     1172                        trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
     1173                                "The trace(%s) has already been started and is not paused!!", libtrace->uridata);
     1174                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1175                        return -1;
     1176                }
     1177               
     1178                // Update the per_pkt function, or reuse the old one
     1179                if (per_pkt)
     1180                        libtrace->per_pkt = per_pkt;
     1181
     1182                assert(libtrace_parallel);
     1183                assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
     1184                assert(libtrace->per_pkt);
     1185               
    11161186                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    1117                         printf("This format has direct support for p's\n");
     1187                        fprintf(stderr, "Restarting trace pstart_input()\n");
    11181188                        err = libtrace->format->pstart_input(libtrace);
    11191189                } else {
    11201190                        if (libtrace->format->start_input) {
     1191                                fprintf(stderr, "Restarting trace start_input()\n");
    11211192                                err = libtrace->format->start_input(libtrace);
    11221193                        }
    11231194                }
    1124                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1125                 libtrace->started = true;
    1126                 libtrace->state = STATE_RUNNING;
    1127                 assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
     1195               
     1196                if (err == 0) {
     1197                        libtrace->started = true;
     1198                        libtrace_change_state(libtrace, STATE_RUNNING, false);
     1199                }
    11281200                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    1129                 return 0;
     1201                return err;
    11301202        }
    11311203
     
    11371209        libtrace->per_pkt = per_pkt;
    11381210        libtrace->reducer = reducer;
    1139         libtrace->perpkts_finishing = 0;
    11401211
    11411212        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
    11421213        assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
    11431214        assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
     1215        // Grab the lock
    11441216        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    11451217
     
    11641236
    11651237        libtrace->started = true; // Before we start the threads otherwise we could have issues
    1166         libtrace->state = STATE_RUNNING;
     1238        libtrace_change_state(libtrace, STATE_RUNNING, false);
    11671239        /* Disable signals - Pthread signal handling */
    11681240
     
    12041276
    12051277
    1206         /* Start all of our perpkt threads */
     1278        /* Ready all of our perpkt threads - they are started later */
    12071279        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
    12081280        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     
    12481320        }
    12491321
     1322        for (i = 0; i < THREAD_STATE_MAX; ++i) {
     1323                libtrace->perpkt_thread_states[i] = 0;
     1324        }
     1325        libtrace->perpkt_thread_states[THREAD_RUNNING] = threads_started;
     1326
    12501327        // Revert back - Allow signals again
    12511328        assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
     
    12581335        // TODO fix these leaks etc
    12591336        if (libtrace->perpkt_thread_count != threads_started)
    1260                 printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
     1337                fprintf(stderr, "Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
    12611338
    12621339
     
    12691346 * 2. All perpkt threads are paused waiting on a condition var
    12701347 * 3. Then call ppause on the underlying format if found
    1271  * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads)
    1272  * 
     1348 * 4. The traces state is paused
     1349 *
    12731350 * Once done you should be able to modify the trace setup and call pstart again
    12741351 * TODO handle changing thread numbers
     
    12791356        int i;
    12801357        assert(libtrace);
     1358       
     1359        t = get_thread_table(libtrace);
     1360        // Check state from within the lock if we are going to change it
     1361        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    12811362        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
    12821363                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
    12831364                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
     1365                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    12841366                return -1;
    12851367        }
    12861368
    1287         t = get_thread_table(libtrace);
    1288 
    1289         // Set pausing
    1290         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1291         libtrace->state = STATE_PAUSING;
    1292         pthread_cond_broadcast(&libtrace->perpkt_cond);
     1369        libtrace_change_state(libtrace, STATE_PAUSING, false);
    12931370        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    12941371
     
    13011378                // Wait for it to pause
    13021379                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1303                 while (1 != libtrace->perpkts_pausing) {
     1380                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
    13041381                        assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
    13051382                }
    1306                 libtrace->perpkts_pausing--; // Do this on the hasher's behalf
    13071383                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    13081384        }
    13091385
    13101386        fprintf(stderr, "Sending messages \n");
    1311         // Stop threads, skip this one if its a perpkt
     1387        // Stop threads, skip this one if it's a perpkt
    13121388        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    13131389                if (&libtrace->perpkt_threads[i] != t) {
     
    13321408
    13331409        if (t) {
    1334                 // A perpkt is doing the pausing interesting fake a extra thread paused
    1335                 // We rely on the user to not return before starting the trace again
    1336                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1337                 libtrace->perpkts_pausing++;
    1338                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1410                // A perpkt is doing the pausing, interesting, fake an extra thread paused
     1411                // We rely on the user to *not* return before starting the trace again
     1412                thread_change_state(libtrace, t, THREAD_PAUSED, true);
    13391413        }
    13401414
     
    13431417        // Wait for all threads to pause
    13441418        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1345         while (libtrace->perpkt_thread_count != libtrace->perpkts_pausing) {
     1419        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
    13461420                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
    13471421        }
     
    13651439
    13661440        // Only set as paused after the pause has been called on the trace
    1367         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1368         libtrace->state = STATE_PAUSED;
    1369         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1441        libtrace_change_state(libtrace, STATE_PAUSED, true);
    13701442        return 0;
    13711443}
     
    13761448 * 1. Calls ppause
    13771449 * 2. Sends a message asking for threads to finish
    1378  *
     1450 * 3. Releases threads which will pause
    13791451 */
    13801452DLLEXPORT int trace_pstop(libtrace_t *libtrace)
    13811453{
    1382         int i;
     1454        int i, err;
    13831455        libtrace_message_t message = {0};
    13841456        assert(libtrace);
    1385         if (!libtrace->started) {
    1386                 trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_pstop()");
    1387                 return -1;
    1388         }
    13891457
    13901458        // Ensure all threads have paused and the underlying trace format has
    13911459        // been closed and all packets associated are cleaned up
    1392         trace_ppause(libtrace);
     1460        // Pause will do any state checks for us
     1461        err = trace_ppause(libtrace);
     1462        if (err)
     1463                return err;
    13931464
    13941465        // Now send a message asking the threads to stop
     
    14051476
    14061477        // Now release the threads and let them stop
    1407         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1408         libtrace->state = STATE_FINSHED;
    1409         assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
    1410         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1478        libtrace_change_state(libtrace, STATE_FINSHED, true);
    14111479        return 0;
    14121480}
     
    14871555                // the producer (or any other threads) don't block.
    14881556                libtrace_packet_t * packet;
    1489                 // Mark that we are no longer accepting packets
    1490                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1491                 libtrace->perpkt_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
    1492                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1557                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
    14931558                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
    14941559                        if (packet) // This could be NULL iff the perpkt finishes early
     
    14971562
    14981563        /* Now the hasher */
    1499         // XXX signal it to stop
     1564        // XXX signal it to stop if it hasn't already we should never be in this situation!!
    15001565        if (trace_has_dedicated_hasher(libtrace)) {
    15011566                fprintf(stderr, "Waiting to join with the hasher\n");
    15021567                pthread_join(libtrace->hasher_thread.tid, NULL);
    15031568                fprintf(stderr, "Joined with the hasher\n");
    1504                 libtrace->hasher_thread.state = THREAD_FINISHED;
     1569                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
    15051570        }
    15061571
     
    15191584                // Cannot destroy vector yet, this happens with trace_destroy
    15201585        }
    1521         libtrace->state = STATE_FINSHED;
     1586        // TODO consider perpkt threads marking trace as finished before join is called
     1587        libtrace_change_state(libtrace, STATE_FINSHED, true);
    15221588       
    1523         // Wait for the tick (keepalive) thread if its been started
    1524         if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE && libtrace->keepalive_thread.state == THREAD_RUNNING) {
     1589        // Wait for the tick (keepalive) thread if it has been started
     1590        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
    15251591                libtrace_message_t msg = {0};
    15261592                msg.code = MESSAGE_DO_STOP;
     
    15301596                fprintf(stderr, "Joined with with the keepalive\n");
    15311597        }
    1532         // Lets mark this as done for now
    1533         libtrace->state = STATE_JOINED;
     1598       
     1599        libtrace_change_state(libtrace, STATE_JOINED, true);
    15341600}
    15351601
     
    18501916
    18511917DLLEXPORT int trace_finished(libtrace_t * libtrace) {
    1852         int i;
    1853         int b = 0;
    1854         // TODO I don't like using this so much
    1855         //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1856         for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    1857                 if (libtrace->perpkt_threads[i].state == THREAD_RUNNING)
    1858                         b++;
    1859         }
    1860         //assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    1861         return !b;
     1918        // TODO I don't like using this so much, we could use state!!!
     1919        return !(libtrace->perpkt_thread_states[THREAD_RUNNING] || libtrace->perpkt_thread_states[THREAD_FINISHING]);
    18621920}
    18631921
Note: See TracChangeset for help on using the changeset viewer.