Changeset 04bf7c5


Ignore:
Timestamp:
01/20/15 09:44:16 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
75e088f, fed9152
Parents:
b73407f
Message:

Remove unused sliding window code.
Refactored pstart and added some proper error handling.

Location:
lib
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • lib/data-struct/object_cache.c

    r6e41e73 r04bf7c5  
    175175  *             reads will block (free never should).Otherwise packets can be freely
    176176  *     allocated upon requested and are free'd if there is not enough space for them.
    177   * @return Returns The number of packets outstanding, or extra object recevied
    178   *             Ideally this should be zero (0) otherwise some form of memory leak
    179   *             is likely present.
     177  * @return If successful returns 0 otherwise -1.
    180178  */
    181 DLLEXPORT void libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void), void (*free)(void *),
    182                                                                           size_t thread_cache_size, size_t buffer_size, bool limit_size) {
     179DLLEXPORT int libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void),
     180                                    void (*free)(void *),
     181                                    size_t thread_cache_size,
     182                                    size_t buffer_size, bool limit_size) {
    183183
    184184        assert(buffer_size);
    185185        assert(alloc);
    186186        assert(free);
    187         libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
     187        if (libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING) != 0) {
     188                return -1;
     189        }
    188190        oc->alloc = alloc;
    189191        oc->free = free;
     
    193195        oc->max_nb_thread_list = 0x10;
    194196        oc->thread_list = calloc(0x10, sizeof(void*));
     197        if (oc->thread_list == NULL) {
     198                libtrace_ringbuffer_destroy(&oc->rb);
     199                return -1;
     200        }
    195201        pthread_spin_init(&oc->spin, 0);
    196202        if (limit_size)
     
    198204        else
    199205                oc->max_allocations = 0;
     206        return 0;
    200207}
    201208
  • lib/data-struct/object_cache.h

    r6e41e73 r04bf7c5  
    2020} libtrace_ocache_t;
    2121
    22 DLLEXPORT void libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void), void (*free)(void*),
     22DLLEXPORT int libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void), void (*free)(void*),
    2323                                    size_t thread_cache_size, size_t buffer_size, bool limit_size);
    2424DLLEXPORT int libtrace_ocache_destroy(libtrace_ocache_t *oc);
  • lib/data-struct/ring_buffer.c

    ra49a9eb r04bf7c5  
    4747 *                              becomes available. LIBTRACE_RINGBUFFER_BLOCKING or LIBTRACE_RINGBUFFER_POLLING.
    4848 *                              NOTE: this mainly applies to the blocking functions
    49  */
    50 DLLEXPORT void libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) {
     49 * @return If successful returns 0 otherwise -1 upon failure.
     50 */
     51DLLEXPORT int libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) {
    5152        size = size + 1;
    52         assert (size > 1);
    53         rb->size = size; // Only this -1 actually usable :)
     53        if (!(size > 1))
     54                return -1;
     55        rb->size = size;
    5456        rb->start = 0;
    5557        rb->end = 0;
    5658        rb->elements = calloc(rb->size, sizeof(void*));
    57         assert(rb->elements);
     59        if (!rb->elements)
     60                return -1;
    5861        rb->mode = mode;
    5962        if (mode == LIBTRACE_RINGBUFFER_BLOCKING) {
    60                 /* The signaling part - i.e. release when data's ready to read */
     63                /* The signaling part - i.e. release when data is ready to read */
    6164                pthread_cond_init(&rb->full_cond, NULL);
    6265                pthread_cond_init(&rb->empty_cond, NULL);
     
    7578        ASSERT_RET(pthread_mutex_init(&rb->rlock, NULL), == 0);
    7679#endif
     80        return 0;
    7781}
    7882
  • lib/data-struct/ring_buffer.h

    ra49a9eb r04bf7c5  
    3232} libtrace_ringbuffer_t;
    3333
    34 DLLEXPORT void libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode);
     34DLLEXPORT int libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode);
    3535DLLEXPORT void libtrace_zero_ringbuffer(libtrace_ringbuffer_t * rb);
    3636DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb);
  • lib/libtrace_int.h

    rd7fd648 r04bf7c5  
    292292        int perpkt_thread_states[THREAD_STATE_MAX];
    293293
    294         /** For the sliding window hasher implementation */
    295         pthread_rwlock_t window_lock;
    296294        /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */
    297295        bool perpkt_queue_full;
     
    315313        int perpkt_thread_count;
    316314        libtrace_thread_t * perpkt_threads; // All our perpkt threads
    317         libtrace_slidingwindow_t sliding_window;
    318         sem_t sem;
    319315        // Used to keep track of the first packet seen on each thread
    320316        struct first_packets first_packets;
     
    881877        struct libtrace_info_t info;
    882878
    883         /** Starts or unpauses an input trace in parallel mode - note that
     879        /**
     880         * Starts or unpauses an input trace in parallel mode - note that
    884881         * this function is often the one that opens the file or device for
    885882         * reading.
    886883         *
    887884         * @param libtrace      The input trace to be started or unpaused
    888          * @return If successful the number of threads started, 0 indicates
    889          *                 no threads started and this should be done automatically.
    890          *                 Otherwise in event of an error -1 is returned.
     885         * @return 0 upon success.
     886         *         Otherwise in event of an error -1 is returned.
    891887         *
    892888         */
  • lib/trace.c

    rb73407f r04bf7c5  
    261261       
    262262        /* Parallel inits */
    263         // libtrace->libtrace_lock
    264         // libtrace->perpkt_cond;
     263        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     264        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
    265265        libtrace->state = STATE_NEW;
    266266        libtrace->perpkt_queue_full = false;
     
    273273        libtrace_zero_thread(&libtrace->reporter_thread);
    274274        libtrace_zero_thread(&libtrace->keepalive_thread);
    275         libtrace_zero_slidingwindow(&libtrace->sliding_window);
    276275        libtrace->reporter_thread.type = THREAD_EMPTY;
    277276        libtrace->perpkt_thread_count = 0;
     
    381380       
    382381        /* Parallel inits */
    383         // libtrace->libtrace_lock
    384         // libtrace->perpkt_cond;
     382        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     383        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
    385384        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
    386385        libtrace->perpkt_queue_full = false;
     
    393392        libtrace_zero_thread(&libtrace->reporter_thread);
    394393        libtrace_zero_thread(&libtrace->keepalive_thread);
    395         libtrace_zero_slidingwindow(&libtrace->sliding_window);
    396394        libtrace->reporter_thread.type = THREAD_EMPTY;
    397395        libtrace->perpkt_thread_count = 0;
     
    634632 */
    635633DLLEXPORT void trace_destroy(libtrace_t *libtrace) {
    636     int i;
     634        int i;
    637635        assert(libtrace);
    638636
    639         /* destroy any packet that are still around */
     637        ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0);
     638        ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0);
     639
     640        /* destroy any packets that are still around */
    640641        if (libtrace->state != STATE_NEW && libtrace->first_packets.packets) {
    641642                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
     
    687688DLLEXPORT void trace_destroy_dead(libtrace_t *libtrace) {
    688689        assert(libtrace);
     690
     691        ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0);
     692        ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0);
    689693
    690694        /* Don't call pause_input or fin_input, because we should never have
  • lib/trace_parallel.c

    rd51fd26 r04bf7c5  
    170170
    171171/**
    172  * True if the trace has dedicated hasher thread otherwise false,
    173  * to be used after the trace is running
     172 * True if the trace has dedicated hasher thread otherwise false.
     173 * This can be used once the hasher thread has been started.
    174174 */
    175175static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
    176176{
    177         assert(libtrace->state != STATE_NEW);
    178177        return libtrace->hasher_thread.type == THREAD_HASHER;
    179178}
     
    230229                        prev_state, t->state);
    231230
     231        pthread_cond_broadcast(&trace->perpkt_cond);
    232232        if (need_lock)
    233233                pthread_mutex_unlock(&trace->libtrace_lock);
    234         pthread_cond_broadcast(&trace->perpkt_cond);
    235234}
    236235
     
    257256                        get_trace_state_name(trace->state));
    258257
     258        pthread_cond_broadcast(&trace->perpkt_cond);
    259259        if (need_lock)
    260260                pthread_mutex_unlock(&trace->libtrace_lock);
    261         pthread_cond_broadcast(&trace->perpkt_cond);
    262261}
    263262
     
    293292
    294293void libtrace_zero_thread(libtrace_thread_t * t) {
     294        t->accepted_packets = 0;
     295        t->recorded_first = false;
     296        t->tracetime_offset_usec = 0;
     297        t->user_data = 0;
     298        t->format_data = 0;
     299        libtrace_zero_ringbuffer(&t->rbuffer);
    295300        t->trace = NULL;
    296301        t->ret = NULL;
    297302        t->type = THREAD_EMPTY;
    298         libtrace_zero_ringbuffer(&t->rbuffer);
    299         t->recorded_first = false;
    300303        t->perpkt_num = -1;
    301         t->accepted_packets = 0;
    302304}
    303305
     
    414416                        size_t z;
    415417                        // We could have an eof or error and a message such as pause
    416                         for (z = i ; z < nb_packets; ++z) {
    417                                 fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);
     418                        for (z = i + 1 ; z < nb_packets; ++z) {
     419                                fprintf(stderr, "i=%d nb_packets=%d err=%d, seq=%d\n", (int) z, (int) nb_packets, packets[z]->error, (int) packets[z]->order);
    418420                                assert (packets[z]->error <= 0);
    419421                        }
     
    461463        int ret;
    462464
    463         /* Fill it with empty packets */
    464         memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
    465         libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets, trace->config.burst_size, trace->config.burst_size);
    466 
    467         // Force this thread to wait until trace_pstart has been completed
     465        /* Wait until trace_pstart has been completed */
    468466        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    469467        t = get_thread_table(trace);
    470468        assert(t);
     469        if (trace->state == STATE_ERROR) {
     470                thread_change_state(trace, t, THREAD_FINISHED, false);
     471                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     472                pthread_exit(NULL);
     473        }
    471474        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
    472475        if (trace->format->pregister_thread) {
     
    474477        }
    475478        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     479
     480        /* Fill our buffer with empty packets */
     481        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
     482        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
     483                              trace->config.burst_size,
     484                              trace->config.burst_size);
    476485
    477486        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
     
    541550                        else if (ret != nb_packets) {
    542551                                // Refill the empty packets
    543                                 printf("Refilling packets ret=%d nb_packets=%zd\n", ret, nb_packets);
     552                                //printf("Refilling packets ret=%d nb_packets=%zd\n", ret, nb_packets);
    544553                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[ret], nb_packets - ret, nb_packets - ret);
    545554                        }
     
    607616        t = &trace->hasher_thread;
    608617        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
     618        if (trace->state == STATE_ERROR) {
     619                thread_change_state(trace, t, THREAD_FINISHED, false);
     620                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     621                pthread_exit(NULL);
     622        }
     623
    609624        printf("Hasher Thread started\n");
    610625        if (trace->format->pregister_thread) {
     
    10041019
    10051020/**
    1006  * This case is much like the dedicated hasher, except that we will become
    1007  * hasher if we don't have a packet waiting.
    1008  *
    1009  * TODO: You can lose the tail of a trace if the final thread
    1010  * fills its own queue and therefore breaks early and doesn't empty the sliding window.
    1011  *
    1012  * TODO: Can block on zero copy formats such as ring: and dpdk: if the
    1013  * queue sizes in total are larger than the ring size.
    1014  *
    1015  * 1. We read a packet from our buffer
    1016  * 2. Move that into the packet provided (packet)
    1017  */
    1018 inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    1019 {
    1020         int ret, i, thread/*, psize*/;
    1021 
    1022         if (t->state == THREAD_FINISHING)
    1023                 return trace_handle_finishing_perpkt(libtrace, packet, t);
    1024 
    1025         while (1) {
    1026                 // Check if we have packets ready
    1027                 if(try_waiting_queue(libtrace, t, packet, &ret))
    1028                         return ret;
    1029 
    1030                 // We limit the number of packets we get to the size of the sliding window
    1031                 // such that it is impossible for any given thread to fail to store a packet
    1032                 ASSERT_RET(sem_wait(&libtrace->sem), == 0);
    1033                 /*~~~~Single threaded read of a packet~~~~*/
    1034                 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    1035 
    1036                 /* Re-check our queue things we might have data waiting */
    1037                 if(try_waiting_queue(libtrace, t, packet, &ret)) {
    1038                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1039                         ASSERT_RET(sem_post(&libtrace->sem), == 0);
    1040                         return ret;
    1041                 }
    1042 
    1043                 // TODO put on *proper* condition variable
    1044                 if (libtrace->perpkt_queue_full) {
    1045                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1046                         ASSERT_RET(sem_post(&libtrace->sem), == 0);
    1047                         contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    1048                         continue;
    1049                 }
    1050 
    1051                 if (!*packet)
    1052                         libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
    1053                 assert(*packet);
    1054 
    1055                 if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
    1056                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1057                         ASSERT_RET(sem_post(&libtrace->sem), == 0);
    1058                         // Finish this thread ensuring that any data written later by another thread is retrieved also
    1059                         if (libtrace_halt)
    1060                                 return 0;
    1061                         else
    1062                                 return trace_finish_perpkt(libtrace, packet, t);
    1063                 }
    1064                 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1065 
    1066                 /* ~~~~Multiple threads can run the hasher~~~~ */
    1067                 trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
    1068 
    1069                 /* Yes this is correct opposite read lock for a write operation */
    1070                 ASSERT_RET(pthread_rwlock_rdlock(&libtrace->window_lock), == 0);
    1071                 if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
    1072                         assert(!"Semaphore should stop us from ever overfilling the sliding window");
    1073                 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    1074                 *packet = NULL;
    1075 
    1076                 // Always try read any data from the sliding window
    1077                 while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
    1078                         ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
    1079                         if (libtrace->perpkt_queue_full) {
    1080                                 // I might be the holdup in which case if I can read my queue I should do that and return
    1081                                 if(try_waiting_queue(libtrace, t, packet, &ret)) {
    1082                                         ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    1083                                         return ret;
    1084                                 }
    1085                                 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    1086                                 continue;
    1087                         }
    1088                         // Read greedily as many as we can
    1089                         while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
    1090                                 thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
    1091                                 if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
    1092                                         while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
    1093                                                 if (t->perpkt_num == thread)
    1094                                                 {
    1095                                                         // TODO think about this case more because we have to stop early if this were to happen on the last read
    1096                                                         // before EOF/error we might not have emptied the sliding window
    1097                                                         printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
    1098                                                         // Its our queue we must have a packet to read out
    1099                                                         if(try_waiting_queue(libtrace, t, packet, &ret)) {
    1100                                                                 // We must be able to write this now 100% without fail
    1101                                                                 libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
    1102                                                                 ASSERT_RET(sem_post(&libtrace->sem), == 0);
    1103                                                                 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    1104                                                                 return ret;
    1105                                                         } else {
    1106                                                                 assert(!"Our queue is full but I cannot read from it??");
    1107                                                         }
    1108                                                 }
    1109                                                 // Not us we have to give the other threads a chance to write there packets then
    1110                                                 libtrace->perpkt_queue_full = true;
    1111                                                 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    1112                                                 for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    1113                                                         ASSERT_RET(sem_post(&libtrace->sem), == 0);
    1114 
    1115                                                 contention_stats[t->perpkt_num].full_queue_hits++;
    1116                                                 ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
    1117                                                 // Grab these back
    1118                                                 for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    1119                                                         ASSERT_RET(sem_wait(&libtrace->sem), == 0);
    1120                                                 libtrace->perpkt_queue_full = false;
    1121                                         }
    1122                                         ASSERT_RET(sem_post(&libtrace->sem), == 0);
    1123                                         *packet = NULL;
    1124                                 } else {
    1125                                         // Cannot write to a queue if no ones waiting (I think this is unreachable)
    1126                                         // in the general case (unless the user ends early without proper clean up).
    1127                                         assert (!"unreachable code??");
    1128                                 }
    1129                         }
    1130                         ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    1131                 }
    1132                 // Now we go back to checking our queue anyways
    1133         }
    1134 }
    1135 
    1136 
    1137 /**
    11381021 * For the first packet of each queue we keep a copy and note the system
    11391022 * time it was received at.
     
    12291112        libtrace_thread_t *t = &trace->reporter_thread;
    12301113        libtrace_vector_t results;
     1114
     1115        fprintf(stderr, "Reporter thread starting\n");
     1116
     1117        /* Wait until all threads are started */
     1118        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
     1119        if (trace->state == STATE_ERROR) {
     1120                thread_change_state(trace, t, THREAD_FINISHED, false);
     1121                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     1122                pthread_exit(NULL);
     1123        }
     1124        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     1125
    12311126        libtrace_vector_init(&results, sizeof(libtrace_result_t));
    1232         fprintf(stderr, "Reporter thread starting\n");
    12331127
    12341128        message.code = MESSAGE_STARTING;
     
    12871181        uint64_t next_release;
    12881182        fprintf(stderr, "keepalive thread is starting\n");
     1183
     1184        /* Wait until all threads are started */
     1185        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
     1186        if (trace->state == STATE_ERROR) {
     1187                thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, false);
     1188                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     1189                pthread_exit(NULL);
     1190        }
     1191        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    12891192
    12901193        gettimeofday(&prev, NULL);
     
    14581361 * assuming a packet is valid.
    14591362 */
    1460 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
     1363static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
     1364                                 libtrace_packet_t *packets[], size_t nb_packets)
    14611365{
    14621366        size_t ret;
     
    14931397}
    14941398
    1495 /* Starts perpkt threads
    1496  * @return threads_started
    1497  */
    1498 static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
    1499         int i;
    1500         char name[16];
    1501         for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    1502                 libtrace_thread_t *t = &libtrace->perpkt_threads[i];
    1503                 ASSERT_RET(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace), == 0);
    1504                 snprintf(name, 16, "perpkt-%d", i);
    1505                 pthread_setname_np(t->tid, name);
    1506         }
    1507         return libtrace->perpkt_thread_count;
    1508 }
    1509 
    1510 /* Start an input trace in a parallel fashion, or restart a paused trace.
    1511  *
    1512  * NOTE: libtrace lock is held for the majority of this function
    1513  *
    1514  * @param libtrace the input trace to start
    1515  * @param global_blob some global data you can share with the new perpkt threads
    1516  * @returns 0 on success
    1517  */
    1518 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter)
    1519 {
    1520         int i;
    1521         char name[16];
    1522         sigset_t sig_before, sig_block_all;
    1523         assert(libtrace);
    1524         if (trace_is_err(libtrace)) {
     1399/* Restarts a parallel trace, this is called from trace_pstart.
     1400 * The libtrace lock is held upon calling this function.
     1401 * Typically with a parallel trace the threads are not
     1402 * killed rather.
     1403 */
     1404static int trace_prestart(libtrace_t * libtrace, void *global_blob,
     1405                          fn_per_pkt per_pkt, fn_reporter reporter) {
     1406        int err = 0;
     1407        if (libtrace->state != STATE_PAUSED) {
     1408                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
     1409                        "trace(%s) is not currently paused",
     1410                              libtrace->uridata);
    15251411                return -1;
    15261412        }
    15271413
    1528         // NOTE: Until the trace is started we wont have a libtrace_lock initialised
    1529         if (libtrace->state != STATE_NEW) {
    1530                 int err = 0;
    1531                 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    1532                 if (libtrace->state != STATE_PAUSED) {
    1533                         trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
    1534                                 "The trace(%s) has already been started and is not paused!!", libtrace->uridata);
    1535                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1536                         return -1;
    1537                 }
    1538 
    1539                 // Update the per_pkt function, or reuse the old one
    1540                 if (per_pkt)
    1541                         libtrace->per_pkt = per_pkt;
    1542 
    1543                 if (reporter)
    1544                         libtrace->reporter = reporter;
    1545 
    1546                 assert(libtrace_parallel);
    1547                 assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
    1548                 assert(libtrace->per_pkt);
    1549 
    1550                 if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    1551                         fprintf(stderr, "Restarting trace pstart_input()\n");
    1552                         err = libtrace->format->pstart_input(libtrace);
    1553                 } else {
    1554                         if (libtrace->format->start_input) {
    1555                                 fprintf(stderr, "Restarting trace start_input()\n");
    1556                                 err = libtrace->format->start_input(libtrace);
    1557                         }
    1558                 }
    1559 
    1560                 if (err == 0) {
    1561                         libtrace->started = true;
    1562                         libtrace_change_state(libtrace, STATE_RUNNING, false);
    1563                 }
    1564                 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1565                 return err;
    1566         }
    1567 
    1568         assert(libtrace->state == STATE_NEW);
    1569         libtrace_parallel = 1;
    1570 
    1571         // Store the user defined things against the trace
    1572         libtrace->global_blob = global_blob;
    1573         libtrace->per_pkt = per_pkt;
    1574         libtrace->reporter = reporter;
    1575 
    1576         ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
    1577         ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
    1578         ASSERT_RET(pthread_rwlock_init(&libtrace->window_lock, NULL), == 0);
    1579         // Grab the lock
    1580         ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    1581 
    1582         // Set default buffer sizes
     1414        /* Update functions if requested */
     1415        if (per_pkt)
     1416                libtrace->per_pkt = per_pkt;
     1417        if (reporter)
     1418                libtrace->reporter = reporter;
     1419        if(global_blob)
     1420                libtrace->global_blob = global_blob;
     1421
     1422        assert(libtrace_parallel);
     1423        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
     1424        assert(libtrace->per_pkt);
     1425
     1426        if (libtrace->perpkt_thread_count > 1 &&
     1427            trace_supports_parallel(libtrace) &&
     1428            !trace_has_dedicated_hasher(libtrace)) {
     1429                fprintf(stderr, "Restarting trace pstart_input()\n");
     1430                err = libtrace->format->pstart_input(libtrace);
     1431        } else {
     1432                if (libtrace->format->start_input) {
     1433                        fprintf(stderr, "Restarting trace start_input()\n");
     1434                        err = libtrace->format->start_input(libtrace);
     1435                }
     1436        }
     1437
     1438        if (err == 0) {
     1439                libtrace->started = true;
     1440                libtrace_change_state(libtrace, STATE_RUNNING, false);
     1441        }
     1442        return err;
     1443}
     1444
     1445/**
     1446 * Verifies the configuration and sets default values for any values not
     1447 * specified by the user.
     1448 * @return
     1449 */
     1450static void verify_configuration(libtrace_t *libtrace) {
     1451
    15831452        if (libtrace->config.hasher_queue_size <= 0)
    15841453                libtrace->config.hasher_queue_size = 1000;
     
    16071476                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
    16081477
    1609         libtrace->started = true; // Before we start the threads otherwise we could have issues
    1610         libtrace_change_state(libtrace, STATE_RUNNING, false);
    1611         /* Disable signals - Pthread signal handling */
    1612 
     1478        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
     1479                libtrace->combiner = combiner_unordered;
     1480}
     1481
     1482/**
     1483 * Starts a libtrace_thread, including allocating memory for messaging.
     1484 * Threads are expected to wait until the libtrace look is released.
     1485 * Hence why we don't init structures until later.
     1486 *
     1487 * @param trace The trace the thread is associated with
     1488 * @param t The thread that is filled when the thread is started
     1489 * @param type The type of thread
     1490 * @param start_routine The entry location of the thread
     1491 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
     1492 * @param name For debugging purposes set the threads name (Optional)
     1493 *
     1494 * @return 0 on success or -1 upon error in which case the libtrace error is set.
     1495 *         In this situation the thread structure is zeroed.
     1496 */
     1497static int trace_start_thread(libtrace_t *trace,
     1498                       libtrace_thread_t *t,
     1499                       enum thread_types type,
     1500                       void *(*start_routine) (void *),
     1501                       int perpkt_num,
     1502                       const char *name) {
     1503        int ret;
     1504        assert(t->type == THREAD_EMPTY);
     1505        t->trace = trace;
     1506        t->ret = NULL;
     1507        t->user_data = NULL;
     1508        t->type = type;
     1509        t->state = THREAD_RUNNING;
     1510        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
     1511        if (ret != 0) {
     1512                libtrace_zero_thread(t);
     1513                trace_set_err(trace, ret, "Failed to create a thread");
     1514                return -1;
     1515        }
     1516        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
     1517        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
     1518                libtrace_ringbuffer_init(&t->rbuffer,
     1519                                         trace->config.hasher_queue_size,
     1520                                         trace->config.hasher_polling?
     1521                                                 LIBTRACE_RINGBUFFER_POLLING:
     1522                                                 LIBTRACE_RINGBUFFER_BLOCKING);
     1523        }
     1524        if(name)
     1525                pthread_setname_np(t->tid, name);
     1526        t->perpkt_num = perpkt_num;
     1527        return 0;
     1528}
     1529
     1530/* Start an input trace in the parallel libtrace framework.
     1531 * This can also be used to restart an existing parallel.
     1532 *
     1533 * NOTE: libtrace lock is held for the majority of this function
     1534 *
     1535 * @param libtrace the input trace to start
     1536 * @param global_blob some global data you can share with the new perpkt threads
     1537 * @returns 0 on success, otherwise -1 to indicate an error has occured
     1538 */
     1539DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
     1540                           fn_per_pkt per_pkt, fn_reporter reporter) {
     1541        int i;
     1542        int ret = -1;
     1543        char name[16];
     1544        sigset_t sig_before, sig_block_all;
     1545        assert(libtrace);
     1546
     1547        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     1548        if (trace_is_err(libtrace)) {
     1549                goto cleanup_none;
     1550        }
     1551
     1552        if (libtrace->state == STATE_PAUSED) {
     1553                ret = trace_prestart(libtrace, global_blob, per_pkt, reporter);
     1554                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     1555                return ret;
     1556        }
     1557
     1558        if (libtrace->state != STATE_NEW) {
     1559                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
     1560                              "should be called on a NEW or PAUSED trace but "
     1561                              "instead was called from %s",
     1562                              get_trace_state_name(libtrace->state));
     1563                goto cleanup_none;
     1564        }
     1565
     1566        /* Store the user defined things against the trace */
     1567        libtrace->global_blob = global_blob;
     1568        libtrace->per_pkt = per_pkt;
     1569        libtrace->reporter = reporter;
     1570        /* And zero other fields */
     1571        for (i = 0; i < THREAD_STATE_MAX; ++i) {
     1572                libtrace->perpkt_thread_states[i] = 0;
     1573        }
     1574        libtrace->first_packets.first = 0;
     1575        libtrace->first_packets.count = 0;
     1576        libtrace->first_packets.packets = NULL;
     1577        libtrace->perpkt_threads = NULL;
     1578        /* Set a global which says we are using a parallel trace. This is
     1579         * for backwards compatability due to changes when destroying packets */
     1580        libtrace_parallel = 1;
     1581
     1582        verify_configuration(libtrace);
     1583
     1584        /* Try start the format */
     1585        if (libtrace->perpkt_thread_count > 1 &&
     1586            trace_supports_parallel(libtrace) &&
     1587            !trace_has_dedicated_hasher(libtrace)) {
     1588                printf("This format has direct support for p's\n");
     1589                ret = libtrace->format->pstart_input(libtrace);
     1590        } else {
     1591                if (libtrace->format->start_input) {
     1592                        ret = libtrace->format->start_input(libtrace);
     1593                }
     1594        }
     1595
     1596        if (ret != 0) {
     1597                goto cleanup_none;
     1598        }
     1599
     1600        /* --- Start all the threads we need --- */
     1601        /* Disable signals because it is inherited by the threads we start */
    16131602        sigemptyset(&sig_block_all);
    1614 
    16151603        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
    16161604
    1617         // If we are using a hasher start it
    1618         // If single threaded we don't need a hasher
    1619         if (libtrace->perpkt_thread_count > 1 && libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
    1620                 libtrace_thread_t *t = &libtrace->hasher_thread;
    1621                 t->trace = libtrace;
    1622                 t->ret = NULL;
    1623                 t->type = THREAD_HASHER;
    1624                 t->state = THREAD_RUNNING;
    1625                 libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    1626                 ASSERT_RET(pthread_create(&t->tid, NULL, hasher_entry, (void *) libtrace), == 0);
    1627                 snprintf(name, sizeof(name), "hasher-thread");
    1628                 pthread_setname_np(t->tid, name);
     1605        /* If we need a hasher thread start it
     1606         * Special Case: If single threaded we don't need a hasher
     1607         */
     1608        if (libtrace->perpkt_thread_count > 1 && libtrace->hasher
     1609            && libtrace->hasher_type != HASHER_HARDWARE) {
     1610                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
     1611                                   THREAD_HASHER, hasher_entry, -1,
     1612                                   "hasher-thread");
     1613                if (ret != 0) {
     1614                        trace_set_err(libtrace, errno, "trace_pstart "
     1615                                      "failed to start a hasher thread.");
     1616                        goto cleanup_started;
     1617                }
    16291618        } else {
    16301619                libtrace->hasher_thread.type = THREAD_EMPTY;
    16311620        }
    16321621
    1633         libtrace_ocache_init(&libtrace->packet_freelist,
    1634                                                  (void* (*)()) trace_create_packet,
    1635                                                  (void (*)(void *))trace_destroy_packet,
    1636                                                  libtrace->config.packet_thread_cache_size,
    1637                                                  libtrace->config.packet_cache_size * 4,
    1638                                                  libtrace->config.fixed_packet_count);
    1639         // Unused slidingwindow code
    1640         //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
    1641         //ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0);
    1642 
    1643         // This will be applied to every new thread that starts, i.e. they will block all signals
    1644         // Lets start a fixed number of reading threads
    1645 
    1646         /* Ready some storages */
    1647         libtrace->first_packets.first = 0;
    1648         libtrace->first_packets.count = 0;
     1622        /* Start up our perpkt threads */
     1623        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
     1624                                          libtrace->perpkt_thread_count);
     1625        if (!libtrace->perpkt_threads) {
     1626                trace_set_err(libtrace, errno, "trace_pstart "
     1627                              "failed to allocate memory.");
     1628                goto cleanup_threads;
     1629        }
     1630        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1631                snprintf(name, sizeof(name), "perpkt-%d", i);
     1632                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
     1633                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
     1634                                   THREAD_PERPKT, perpkt_threads_entry, i,
     1635                                   name);
     1636                if (ret != 0) {
     1637                        trace_set_err(libtrace, errno, "trace_pstart "
     1638                                      "failed to start a perpkt thread.");
     1639                        goto cleanup_threads;
     1640                }
     1641        }
     1642
     1643        /* Start the reporter thread */
     1644        if (reporter) {
     1645                if (libtrace->combiner.initialise)
     1646                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
     1647                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
     1648                                   THREAD_REPORTER, reporter_entry, -1,
     1649                                   "reporter_thread");
     1650                if (ret != 0) {
     1651                        trace_set_err(libtrace, errno, "trace_pstart "
     1652                                      "failed to start reporter thread.");
     1653                        goto cleanup_threads;
     1654                }
     1655        }
     1656
     1657        /* Start the keepalive thread */
     1658        if (libtrace->config.tick_interval > 0) {
     1659                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
     1660                                   THREAD_KEEPALIVE, keepalive_entry, -1,
     1661                                   "keepalive_thread");
     1662                if (ret != 0) {
     1663                        trace_set_err(libtrace, errno, "trace_pstart "
     1664                                      "failed to start keepalive thread.");
     1665                        goto cleanup_threads;
     1666                }
     1667        }
     1668
     1669        /* Init other data structures */
     1670        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
    16491671        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
    1650         libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
    1651 
    1652 
    1653         /* Ready all of our perpkt threads - they are started later */
    1654         libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
    1655         for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    1656                 libtrace_thread_t *t = &libtrace->perpkt_threads[i];
    1657                 t->trace = libtrace;
    1658                 t->ret = NULL;
    1659                 t->type = THREAD_PERPKT;
    1660                 t->state = THREAD_RUNNING;
    1661                 t->user_data = NULL;
    1662                 // t->tid DONE on create
    1663                 t->perpkt_num = i;
    1664                 if (libtrace->hasher)
    1665                         libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size,
    1666                                                  libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0);
    1667                 libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    1668                 t->recorded_first = false;
    1669                 t->tracetime_offset_usec = 0;;
    1670         }
    1671 
    1672         int threads_started = 0;
    1673         /* Setup the trace and start our threads */
    1674         if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    1675                 printf("This format has direct support for p's\n");
    1676                 threads_started = libtrace->format->pstart_input(libtrace);
     1672        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
     1673                                                 sizeof(struct  __packet_storage_magic_type));
     1674        if (libtrace->first_packets.packets == NULL) {
     1675                trace_set_err(libtrace, errno, "trace_pstart "
     1676                              "failed to allocate memory.");
     1677                goto cleanup_threads;
     1678        }
     1679
     1680        /*trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
     1681                      "failed to allocate ocache.");
     1682        goto cleanup_threads;*/
     1683
     1684        if (libtrace_ocache_init(&libtrace->packet_freelist,
     1685                             (void* (*)()) trace_create_packet,
     1686                             (void (*)(void *))trace_destroy_packet,
     1687                             libtrace->config.packet_thread_cache_size,
     1688                             libtrace->config.packet_cache_size * 4,
     1689                             libtrace->config.fixed_packet_count) != 0) {
     1690                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
     1691                              "failed to allocate ocache.");
     1692                goto cleanup_threads;
     1693        }
     1694
     1695        /* Threads don't start */
     1696        libtrace->started = true;
     1697        libtrace_change_state(libtrace, STATE_RUNNING, false);
     1698
     1699        ret = 0;
     1700        goto success;
     1701cleanup_threads:
     1702        if (libtrace->first_packets.packets) {
     1703                free(libtrace->first_packets.packets);
     1704                libtrace->first_packets.packets = NULL;
     1705        }
     1706        libtrace_change_state(libtrace, STATE_ERROR, false);
     1707        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     1708        if (libtrace->hasher_thread.type == THREAD_HASHER) {
     1709                pthread_join(libtrace->hasher_thread.tid, NULL);
     1710                libtrace_zero_thread(&libtrace->hasher_thread);
     1711        }
     1712
     1713        if (libtrace->perpkt_threads) {
     1714                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1715                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
     1716                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
     1717                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
     1718                        } else break;
     1719                }
     1720                free(libtrace->perpkt_threads);
     1721                libtrace->perpkt_threads = NULL;
     1722        }
     1723
     1724        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
     1725                pthread_join(libtrace->reporter_thread.tid, NULL);
     1726                libtrace_zero_thread(&libtrace->reporter_thread);
     1727        }
     1728
     1729        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
     1730                pthread_join(libtrace->keepalive_thread.tid, NULL);
     1731                libtrace_zero_thread(&libtrace->keepalive_thread);
     1732        }
     1733        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     1734        libtrace_change_state(libtrace, STATE_NEW, false);
     1735        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
     1736        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
     1737cleanup_started:
     1738        if (trace_supports_parallel(libtrace) &&
     1739            !trace_has_dedicated_hasher(libtrace)
     1740            && libtrace->perpkt_thread_count > 1) {
     1741                if (libtrace->format->ppause_input)
     1742                        libtrace->format->ppause_input(libtrace);
    16771743        } else {
    1678                 if (libtrace->format->start_input) {
    1679                         threads_started=libtrace->format->start_input(libtrace);
    1680                 }
    1681         }
    1682         if (threads_started == 0)
    1683                 threads_started = trace_start_perpkt_threads(libtrace);
    1684 
    1685         // No combiner set, use a default to reduce the chance of this breaking
    1686         if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
    1687                 libtrace->combiner = combiner_unordered;
    1688 
    1689         if (libtrace->combiner.initialise)
    1690                 libtrace->combiner.initialise(libtrace, &libtrace->combiner);
    1691 
    1692         libtrace->reporter_thread.type = THREAD_REPORTER;
    1693         libtrace->reporter_thread.state = THREAD_RUNNING;
    1694         libtrace_message_queue_init(&libtrace->reporter_thread.messages, sizeof(libtrace_message_t));
    1695         if (reporter) {
    1696                 // Got a real reporter
    1697                 ASSERT_RET(pthread_create(&libtrace->reporter_thread.tid, NULL, reporter_entry, (void *) libtrace), == 0);
    1698         } else {
    1699                 // Main thread is reporter
    1700                 libtrace->reporter_thread.tid = pthread_self();
    1701         }
    1702 
    1703         if (libtrace->config.tick_interval > 0) {
    1704                 libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
    1705                 libtrace->keepalive_thread.state = THREAD_RUNNING;
    1706                 libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
    1707                 ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0);
    1708         }
    1709 
    1710         for (i = 0; i < THREAD_STATE_MAX; ++i) {
    1711                 libtrace->perpkt_thread_states[i] = 0;
    1712         }
    1713         libtrace->perpkt_thread_states[THREAD_RUNNING] = threads_started;
    1714 
    1715         // Revert back - Allow signals again
     1744                if (libtrace->format->pause_input)
     1745                        libtrace->format->pause_input(libtrace);
     1746        }
     1747        ret = -1;
     1748success:
    17161749        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
     1750cleanup_none:
    17171751        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1718 
    1719         if (threads_started < 0)
    1720                 // Error
    1721                 return threads_started;
    1722 
    1723         // TODO fix these leaks etc
    1724         if (libtrace->perpkt_thread_count != threads_started)
    1725                 fprintf(stderr, "Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
    1726 
    1727 
    1728         return 0;
     1752        return ret;
    17291753}
    17301754
Note: See TracChangeset for help on using the changeset viewer.