Changeset fac8c46


Ignore:
Timestamp:
05/20/14 03:25:13 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
82facc5
Parents:
a5662447
Message:

Tidies up the pausing so that it now works as expected and a trace can easily be paused and restarted.
Ensures that packets will not be lost if pause is called on a file, any queued packets will be read (a message is sent allowing the user to drop these packets if they are unwanted).
Differentiates packets from other results in the queues to the reducer/reporter and makes a copy of the packets in result queues when pausing

  • this is needed to ensure that bad memory isn't referenced if a zero-copy trace is paused by closing sockets/associated data like in the case of ring:.

Fixed up the re-starting of traces which hadn't been finished to account for different configurations.
Adds a 'state' to libtrace to handle the state of parallel traces, rather than hacking around the existing 'started' boolean. Also provides two levels of checks for consistency if the trace is using existing that are checking started.

Various other bug fixes and tidy ups.

Files:
12 edited

Legend:

Unmodified
Added
Removed
  • lib/data-struct/deque.c

    read9478 rfac8c46  
    164164        q->size = q->element_size = 0;
    165165}
     166
     167DLLEXPORT void libtrace_deque_apply_function(libtrace_queue_t *q, deque_data_fn fn)
     168{
     169        list_node_t *n;
     170        assert(q->element_size == sizeof(libtrace_result_t));
     171        assert(pthread_mutex_lock(&q->lock) == 0);
     172        n = q->head;
     173        for (n = q->head; n != NULL; n = n->next) {
     174                (*fn)(&n->data);
     175        }
     176        assert(pthread_mutex_unlock(&q->lock) == 0);
     177}
  • lib/data-struct/deque.h

    read9478 rfac8c46  
    66
    77typedef struct list_node list_node_t;
     8typedef void (*deque_data_fn)(void *data);
    89typedef struct libtrace_queue {
    910        list_node_t * head;
     
    2526DLLEXPORT void libtrace_zero_deque(libtrace_queue_t *q);
    2627
     28// Apply a given function to every data item, while keeping the entire
     29// structure locked from external modifications
     30DLLEXPORT void libtrace_deque_apply_function(libtrace_queue_t *q, deque_data_fn fn);
     31
    2732#endif
  • lib/data-struct/message_queue.c

    read9478 rfac8c46  
    1818inline void libtrace_message_queue_init(libtrace_message_queue_t *mq, size_t message_len)
    1919{
     20        assert(message_len);
    2021        assert(pipe(mq->pipefd) != -1);
    2122        mq->message_count = 0;
     
    4344{
    4445        int ret;
     46        assert(mq->message_len);
    4547        assert(write(mq->pipefd[1], message, mq->message_len) == (int) mq->message_len);
    4648        // Update after we've written
  • lib/data-struct/vector.c

    read9478 rfac8c46  
    113113        v->size = 0;
    114114        v->element_size = 0;
    115         v->elements = NULL;     
     115        v->elements = NULL;
    116116}
    117117
     
    121121        assert(pthread_mutex_unlock(&v->lock) == 0);
    122122}
     123
     124
     125DLLEXPORT void libtrace_vector_apply_function(libtrace_vector_t *v, vector_data_fn fn)
     126{
     127        size_t cur;
     128        assert(pthread_mutex_lock(&v->lock) == 0);
     129        for (cur = 0; cur < v->size; cur++) {
     130                (*fn)(&v->elements[cur*v->element_size]);
     131        }
     132        assert(pthread_mutex_unlock(&v->lock) == 0);
     133}
  • lib/data-struct/vector.h

    rabda273 rfac8c46  
    66#define LIBTRACE_VECTOR_H
    77
     8typedef void (*vector_data_fn)(void *data);
    89typedef struct libtrace_vector {
    910        size_t max_size;
     
    2324DLLEXPORT int libtrace_vector_remove_front(libtrace_vector_t *v);
    2425DLLEXPORT void libtrace_vector_empty(libtrace_vector_t *v);
     26
     27// For now this is a special case and this doesn't really belong
     28// here, but to do this properly a full lock is required as
     29// multiple items are changed
     30DLLEXPORT void libtrace_vector_apply_function(libtrace_vector_t *v, vector_data_fn fn);
     31
    2532#endif
  • lib/format_linux.c

    r17a3dff rfac8c46  
    12021202{
    12031203        int fd = FORMAT(libtrace->format_data)->per_thread[get_thread_table_num(libtrace)].fd;
    1204         printf("Thread number is #%d point %p\n", get_thread_table_num(libtrace), FORMAT(libtrace->format_data)->per_thread);
     1204        //printf("Thread number is #%d point %p\n", get_thread_table_num(libtrace), FORMAT(libtrace->format_data)->per_thread);
    12051205        return linuxnative_read_packet_fd(libtrace, packet, fd, 1);
    12061206}
  • lib/libtrace.h.in

    r17a3dff rfac8c46  
    200200        uint64_t key;
    201201        void * value;
     202        int is_packet;
    202203} libtrace_result_t;
    203204
     
    31373138
    31383139DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value);
     3140DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet);
    31393141typedef struct libtrace_vector libtrace_vector_t;
    31403142DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results);
     
    31453147DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message);
    31463148DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message);
     3149DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message);
    31473150DLLEXPORT int trace_finished(libtrace_t * libtrace);
    31483151DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet);
     
    32173220enum libtrace_messages {
    32183221        MESSAGE_STARTED,
    3219         MESSAGE_PAUSE,
    3220         MESSAGE_STOP,
     3222        MESSAGE_DO_PAUSE,
     3223        MESSAGE_PAUSING,
     3224        MESSAGE_PAUSED,
     3225        MESSAGE_DO_STOP,
    32213226        MESSAGE_STOPPED,
    32223227        MESSAGE_FIRST_PACKET,
  • lib/libtrace_int.h

    r17a3dff rfac8c46  
    242242};
    243243
     244#define TRACE_STATES \
     245        X(STATE_NEW) \
     246        X(STATE_RUNNING) \
     247        X(STATE_PAUSING) \
     248        X(STATE_PAUSED) \
     249        X(STATE_FINSHED) \
     250        X(STATE_DESTROYED) \
     251        X(STATE_ERROR) // Currently unused
     252
     253#define X(a) a,
     254enum trace_state {
     255        TRACE_STATES
     256};
     257#undef X
     258
     259#define X(a) case a: return #a;
     260static inline char *get_trace_state_name(enum trace_state ts){
     261        switch(ts) {
     262                TRACE_STATES
     263                default:
     264                        return "UNKNOWN";
     265        }
     266}
     267#undef X
    244268
    245269/** A libtrace input trace
     
    273297        /** Synchronise writes/reads across this format object and attached threads etc */
    274298        pthread_mutex_t libtrace_lock;
    275        
     299        /** State */
     300        enum trace_state state;
    276301        /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */
    277302        pthread_cond_t perpkt_cond;
     
    280305        /** A count of perpkt threads that are pausing */
    281306        int perpkts_pausing;
    282        
     307
    283308        /** For the sliding window hasher implementation */
    284309        pthread_rwlock_t window_lock;
  • lib/trace.c

    r17a3dff rfac8c46  
    263263        // libtrace->libtrace_lock
    264264        // libtrace->perpkt_cond;
     265        libtrace->state = STATE_NEW;
    265266        libtrace->perpkts_pausing = 0;
    266267        libtrace->perpkt_queue_full = false;
     
    282283        libtrace->perpkt_thread_count = 0;
    283284        libtrace->perpkt_threads = NULL;
     285        libtrace->tracetime = 0;
    284286
    285287        /* Parse the URI to determine what sort of trace we are dealing with */
     
    381383        // libtrace->libtrace_lock
    382384        // libtrace->perpkt_cond;
     385        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
    383386        libtrace->perpkts_pausing = 0;
    384387        libtrace->perpkt_queue_full = false;
     
    400403        libtrace->perpkt_thread_count = 0;
    401404        libtrace->perpkt_threads = NULL;
     405        libtrace->tracetime = 0;
    402406       
    403407        for(tmp=formats_list;tmp;tmp=tmp->next) {
     
    783787                if (packet->buf_control != TRACE_CTRL_PACKET)
    784788                {
    785                         //packet->buf_control = 0; // Invalid value this should be fixed
    786789                        packet->buffer = NULL;
    787790                }
  • lib/trace_parallel.c

    r5ce14a5 rfac8c46  
    107107} contention_stats[1024];
    108108
     109inline int trace_has_dedicated_hasher(libtrace_t * libtrace);
    109110
    110111/**
     
    188189}
    189190
     191/** Used below in trace_make_results_packets_safe*/
     192static void do_copy_result_packet(void *data)
     193{
     194        libtrace_result_t *res = (libtrace_result_t *)data;
     195        if (res->is_packet) {
     196                // Duplicate the packet in standard malloc'd memory and free the
     197                // original
     198                libtrace_packet_t *oldpkt, *dup;
     199                oldpkt = (libtrace_packet_t *) res->value;
     200                dup = trace_copy_packet(oldpkt);
     201                res->value = (void *)dup;
     202                trace_destroy_packet(oldpkt);
     203                fprintf(stderr, "Made a packet safe!!\n");
     204        }
     205}
     206
     207/**
     208 * Make a safe replacement copy of any result packets that are owned
     209 * by the format in the result queue. Used when pausing traces.
     210 */
     211static void trace_make_results_packets_safe(libtrace_t *trace) {
     212        libtrace_thread_t *t = get_thread_descriptor(trace);
     213        if (trace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
     214                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
     215        else
     216                libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
     217}
     218
    190219/**
    191220 * Holds threads in a paused state, until released by broadcasting
     
    194223static void trace_thread_pause(libtrace_t *trace) {
    195224        printf("Pausing thread #%d\n", get_thread_table_num(trace));
     225        trace_make_results_packets_safe(trace);
    196226        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
    197227        trace->perpkts_pausing++;
    198228        pthread_cond_broadcast(&trace->perpkt_cond);
    199         while (!trace->started) {
     229        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
    200230                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
    201231        }
     
    212242        libtrace_t *trace = (libtrace_t *)data;
    213243        libtrace_thread_t * t;
    214         libtrace_message_t message;
     244        libtrace_message_t message = {0};
    215245        libtrace_packet_t *packet = NULL;
    216246
     
    237267                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
    238268                        switch (message.code) {
    239                                 case MESSAGE_PAUSE:
     269                                case MESSAGE_DO_PAUSE: // This is internal
     270                                        // Send message to say we are pausing, TODO consider sender
     271                                        message.code = MESSAGE_PAUSING;
     272                                        (*trace->per_pkt)(trace, NULL, &message, t);
     273                                        // If a hasher thread is running empty input queues so we don't loose data
     274                                        if (trace_has_dedicated_hasher(trace)) {
     275                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
     276                                                // The hasher has stopped by this point, so the queue shouldn't be filling
     277                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
     278                                                        psize = trace_pread_packet(trace, &packet);
     279                                                        if (psize > 0) {
     280                                                                packet = (*trace->per_pkt)(trace, packet, NULL, t);
     281                                                        } else {
     282                                                                fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer));
     283                                                        }
     284                                                }
     285                                        }
     286                                        // Send a paused message as a final chance to memory copy any packets
     287                                        message.code = MESSAGE_PAUSED;
     288                                        (*trace->per_pkt)(trace, NULL, &message, t);
     289                                        // Now we do the actual pause, this returns when we are done
    240290                                        trace_thread_pause(trace);
    241                                         break;
    242                                 case MESSAGE_STOP:
     291                                        // Check for new messages as soon as we return
     292                                        continue;
     293                                case MESSAGE_DO_STOP: // This is internal
    243294                                        goto stop;
    244295                        }
     
    296347
    297348/** True if trace has dedicated hasher thread otherwise false */
    298 inline int trace_has_dedicated_hasher(libtrace_t * libtrace);
    299349inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
    300350{
     
    312362        int i;
    313363        libtrace_packet_t * packet;
     364        libtrace_message_t message = {0};
    314365
    315366        assert(trace_has_dedicated_hasher(trace));
     
    331382                        break;
    332383
     384                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
     385                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
     386                        switch(message.code) {
     387                                case MESSAGE_DO_PAUSE:
     388                                        fprintf(stderr, "Pausing hasher thread\n");
     389                                        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     390                                        trace->perpkts_pausing++;
     391                                        pthread_cond_broadcast(&trace->perpkt_cond);
     392                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
     393                                                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
     394                                        }
     395                                        // trace->perpkts_pausing--; // Don't do this the pausing thread will do this for us
     396                                        pthread_cond_broadcast(&trace->perpkt_cond);
     397                                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     398                                        fprintf(stdout, "Mapper resuming\n");
     399                                        break;
     400                                case MESSAGE_DO_STOP:
     401                                        // Stop called after pause
     402                                        assert(trace->started == false);
     403                                        assert(trace->state == STATE_FINSHED);
     404                                default:
     405                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
     406                        }
     407                        pkt_skipped = 1;
     408                        continue;
     409                }
     410
    333411                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
    334412                        break; /* We are EOF or error'd either way we stop  */
     
    361439                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    362440                        // Unlock early otherwise we could deadlock
    363                         libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, NULL);
     441                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
    364442                } else {
    365443                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     
    372450
    373451        // Notify only after we've defiantly set the state to finished
    374         libtrace_message_t message;
    375452        message.code = MESSAGE_PERPKT_ENDED;
    376453        message.additional = NULL;
     
    437514
    438515        if (*packet) {
    439                 return 1;
     516                return (*packet)->error;
    440517        } else {
    441                 printf("Got a NULL packet the trace is over\n");
    442                 return -1; // We are done for some reason
     518                // This is how we do a notify, we send a message before hand to note that the trace is over etc.
     519                // And this will notify the perpkt thread to read that message, this is easiest
     520                // since cases like pause can also be dealt with this way without actually
     521                // having to be the end of the stream.
     522                fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
     523                return -2;
    443524        }
    444525}
     
    771852                }
    772853                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
    773                 libtrace_message_t mesg;
     854                libtrace_message_t mesg = {0};
    774855                mesg.code = MESSAGE_FIRST_PACKET;
    775856                mesg.additional = NULL;
     
    9371018                trace_fin_packet(*packet);
    9381019
    939         if (libtrace->format->pread_packet) {
     1020        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    9401021                if (!*packet)
    9411022                        *packet = trace_create_packet();
    9421023                ret = trace_pread_packet_wrapper(libtrace, *packet);
    943         } else  if (!libtrace->hasher) {
    944                 /* We don't care about which core a packet goes to */
    945                 ret =  trace_pread_packet_first_in_first_served(libtrace, packet);
    9461024        } else if (trace_has_dedicated_hasher(libtrace)) {
    9471025                ret = trace_pread_packet_hasher_thread(libtrace, packet);
    948         } else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) {
     1026        } else if (!trace_has_dedicated_hasher(libtrace)) {
     1027                /* We don't care about which core a packet goes to */
     1028                ret = trace_pread_packet_first_in_first_served(libtrace, packet);
     1029        } /* else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) {
    9491030                ret = trace_pread_packet_sliding_window(libtrace, packet);
    9501031        } else {
    9511032                ret = trace_pread_packet_hash_locked(libtrace, packet);
    952         }
     1033        }*/
    9531034
    9541035        // Formats can also optionally do this internally to ensure the first
     
    9781059/* Start an input trace in a parallel fashion.
    9791060 *
    980  * @param libtrace      the input trace to start
    981  * @param global_blob some global data you can share with the new thread
     1061 * @param libtrace the input trace to start
     1062 * @param global_blob some global data you can share with the new perpkt threads
    9821063 * @returns 0 on success
    9831064 */
     
    9891070        assert(libtrace);
    9901071        if (trace_is_err(libtrace))
    991                 return -1;;
    992         if (libtrace->perpkts_pausing != 0) {
    993                 printf("Restarting trace\n");
    994                 libtrace->format->pstart_input(libtrace);
    995                 // TODO empty any queues out here //
     1072                return -1;
     1073        if (libtrace->state == STATE_PAUSED) {
     1074                assert (libtrace->perpkts_pausing != 0);
     1075                fprintf(stderr, "Restarting trace\n");
     1076                int err;
     1077                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
     1078                        printf("This format has direct support for p's\n");
     1079                        err = libtrace->format->pstart_input(libtrace);
     1080                } else {
     1081                        if (libtrace->format->start_input) {
     1082                                err = libtrace->format->start_input(libtrace);
     1083                        }
     1084                }
    9961085                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    9971086                libtrace->started = true;
     1087                libtrace->state = STATE_RUNNING;
    9981088                assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
    9991089                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    10011091        }
    10021092
     1093        assert(libtrace->state == STATE_NEW);
    10031094        libtrace_parallel = 1;
    10041095
     
    10301121
    10311122        libtrace->started=true; // Before we start the threads otherwise we could have issues
     1123        libtrace->state = STATE_RUNNING;
    10321124        /* Disable signals - Pthread signal handling */
    10331125
     
    10431135                t->type = THREAD_HASHER;
    10441136                t->state = THREAD_RUNNING;
     1137                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    10451138                assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
    10461139        } else {
    10471140                libtrace->hasher_thread.type = THREAD_EMPTY;
    10481141        }
    1049         libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_POLLING);
     1142        libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING);
    10501143        libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
    10511144        assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
     
    10791172                t->perpkt_num = i;
    10801173                if (libtrace->hasher)
    1081                         libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
     1174                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
    10821175                // Depending on the mode vector or deque might be chosen
    10831176                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
     
    10931186        int threads_started = 0;
    10941187        /* Setup the trace and start our threads */
    1095         if (libtrace->perpkt_thread_count > 1 && libtrace->format->pstart_input) {
     1188        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    10961189                printf("This format has direct support for p's\n");
    10971190                threads_started = libtrace->format->pstart_input(libtrace);
     
    11281221 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads)
    11291222 *
    1130  * Once done you should be a able to modify the trace setup and call pstart again
     1223 * Once done you should be able to modify the trace setup and call pstart again
    11311224 * TODO handle changing thread numbers
    11321225 */
     
    11361229        int i;
    11371230        assert(libtrace);
    1138         if (!libtrace->started) {
     1231        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
     1232                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
    11391233                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
    11401234                return -1;
     
    11431237        t = get_thread_table(libtrace);
    11441238
    1145         // Set paused
     1239        // Set pausing
    11461240        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1147         libtrace->started = false;
     1241        libtrace->state = STATE_PAUSING;
    11481242        pthread_cond_broadcast(&libtrace->perpkt_cond);
    11491243        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    11501244
    1151         printf("Sending messages \n");
     1245        // Special case handle the hasher thread case
     1246        if (trace_has_dedicated_hasher(libtrace)) {
     1247                fprintf(stderr, "Hasher thread running we deal with this special!\n");
     1248                libtrace_message_t message = {0};
     1249                message.code = MESSAGE_DO_PAUSE;
     1250                message.additional = NULL;
     1251                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
     1252                // Wait for it to pause
     1253                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1254                while (1 != libtrace->perpkts_pausing) {
     1255                        assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
     1256                }
     1257                libtrace->perpkts_pausing--; // Do this on the hasher's behalf
     1258                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1259        }
     1260
     1261        fprintf(stderr, "Sending messages \n");
    11521262        // Stop threads, skip this one if its a perpkt
    11531263        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    11541264                if (&libtrace->perpkt_threads[i] != t) {
    1155                         libtrace_message_t message;
    1156                         message.code = MESSAGE_PAUSE;
     1265                        libtrace_message_t message = {0};
     1266                        message.code = MESSAGE_DO_PAUSE;
    11571267                        message.additional = NULL;
    11581268                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
     1269                        if(trace_has_dedicated_hasher(libtrace)) {
     1270                                // The hasher has stopped and other threads have messages waiting therefore
     1271                                // If the queues are empty the other threads would have no data
     1272                                // So send some NULL packets to simply ask the threads to check there message queues
     1273                                // We are the only writer since hasher has paused
     1274                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL);
     1275                        }
     1276                } else {
     1277                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
    11591278                }
    11601279        }
     
    11721291        }
    11731292
    1174         printf("Threads are pausing\n");
    1175 
    1176         // Do a early pause to kick threads out - XXX testing for int
    1177         if (libtrace->format->pause_input)
    1178                         libtrace->format->pause_input(libtrace);
     1293        fprintf(stderr, "Asking threads to pause\n");
    11791294
    11801295        // Wait for all threads to pause
     
    11851300        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    11861301
    1187         printf("Threads have paused\n");
    1188 
    1189         if (trace_supports_parallel(libtrace)) {
     1302        fprintf(stderr, "Threads have paused\n");
     1303
     1304        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
     1305                libtrace->started = false;
    11901306                if (libtrace->format->ppause_input)
    11911307                        libtrace->format->ppause_input(libtrace);
    11921308                // TODO What happens if we don't have pause input??
    11931309        } else {
    1194                 printf("Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
    1195                 // This doesn't really work because this could be called by any thread
    1196                 // Maybe we should grab the lock here??
    1197                 if (libtrace->format->pause_input)
    1198                         libtrace->format->pause_input(libtrace);
    1199                 // TODO What happens if we don't have pause input??
    1200         }
    1201 
     1310                int err;
     1311                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
     1312                err = trace_pause(libtrace);
     1313                // We should handle this a bit better
     1314                if (err)
     1315                        return err;
     1316        }
     1317
     1318        // Only set as paused after the pause has been called on the trace
     1319        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1320        libtrace->state = STATE_PAUSED;
     1321        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    12021322        return 0;
    12031323}
     
    12131333{
    12141334        int i;
     1335        libtrace_message_t message = {0};
     1336        assert(libtrace);
    12151337        if (!libtrace->started) {
    12161338                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_pstop()");
     
    12191341
    12201342        // Ensure all threads have paused and the underlying trace format has
    1221         // been closed
     1343        // been closed and all packets associated are cleaned up
    12221344        trace_ppause(libtrace);
    12231345
    12241346        // Now send a message asking the threads to stop
    12251347        // This will be retrieved before trying to read another packet
     1348       
     1349        message.code = MESSAGE_DO_STOP;
     1350        message.additional = NULL;
     1351        trace_send_message_to_perpkts(libtrace, &message);
     1352        if (trace_has_dedicated_hasher(libtrace))
     1353                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
     1354       
    12261355        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    1227                 libtrace_message_t message;
    1228                 message.code = MESSAGE_STOP;
    1229                 message.additional = NULL;
    12301356                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    12311357        }
     
    12331359        // Now release the threads and let them stop
    12341360        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1235         libtrace->started = true;
     1361        libtrace->state = STATE_FINSHED;
    12361362        assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
    12371363        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    12561382        if (hasher) {
    12571383                trace->hasher = hasher;
    1258                 trace->hasher_data = hasher;
     1384                trace->hasher_data = data;
    12591385        } else {
    12601386                trace->hasher = NULL;
     
    12791405                                        return 0;
    12801406                                case HASHER_BIDIRECTIONAL:
    1281                                         trace->hasher = toeplitz_hash_packet;
     1407                                        trace->hasher = &toeplitz_hash_packet;
    12821408                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
    12831409                                        toeplitz_init_config(trace->hasher_data, 1);
    12841410                                        return 0;
    12851411                                case HASHER_UNIDIRECTIONAL:
    1286                                         trace->hasher = toeplitz_hash_packet;
     1412                                        trace->hasher = &toeplitz_hash_packet;
    12871413                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
    1288                                         toeplitz_init_config(trace->hasher_data, 1);
     1414                                        toeplitz_init_config(trace->hasher_data, 0);
    12891415                                        return 0;
    12901416                                case HASHER_HARDWARE:
     
    13261452        // XXX signal it to stop
    13271453        if (trace_has_dedicated_hasher(libtrace)) {
    1328                 printf("Waiting to join with the hasher\n");
     1454                fprintf(stderr, "Waiting to join with the hasher\n");
    13291455                pthread_join(libtrace->hasher_thread.tid, NULL);
    1330                 printf("Joined with with the hasher\n");
     1456                fprintf(stderr, "Joined with with the hasher\n");
    13311457                libtrace->hasher_thread.state = THREAD_FINISHED;
    13321458        }
     
    14161542}
    14171543
     1544DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
     1545{
     1546        int i;
     1547        message->sender = get_thread_descriptor(libtrace);
     1548        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1549                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
     1550        }
     1551        //printf("Sending message code=%d to reducer\n", message->code);
     1552        return 0;
     1553}
     1554
    14181555DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
    14191556        result->key = key;
     
    15251662DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
    15261663        libtrace_result_t res;
     1664        res.is_packet = 0;
    15271665        // Who am I???
    15281666        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
     
    15531691                //      sched_yield();
    15541692
     1693                if (libtrace_vector_get_size(&t->vector) >= 800) {
     1694                        trace_post_reduce(libtrace);
     1695                }
     1696                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
     1697        }
     1698}
     1699
     1700DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
     1701        libtrace_result_t res;
     1702        // Who am I???
     1703        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
     1704        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
     1705        // Now put it into my table
     1706        static __thread int count = 0;
     1707
     1708        res.is_packet = 1;
     1709        libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet);
     1710        /*
     1711        if (count == 1)
     1712                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
     1713        count = (count+1) %1000;
     1714        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
     1715        */
     1716        /*if (count == 1)
     1717                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
     1718        count = (count+1)%1000;*/
     1719        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    15551720                if (libtrace_deque_get_size(&t->deque) >= 800) {
     1721                        trace_post_reduce(libtrace);
     1722                }
     1723                //while (libtrace_deque_get_size(&t->deque) >= 1000)
     1724                //      sched_yield();
     1725                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
     1726        } else {
     1727                //while (libtrace_vector_get_size(&t->vector) >= 1000)
     1728                //      sched_yield();
     1729
     1730                if (libtrace_vector_get_size(&t->vector) >= 800) {
    15561731                        trace_post_reduce(libtrace);
    15571732                }
  • tools/traceanon/traceanon_parallel.c

    r17a3dff rfac8c46  
    1313#include <data-struct/vector.h>
    1414#include <data-struct/message_queue.h>
     15#include <signal.h>
    1516
    1617bool enc_source = false;
     
    1819enum enc_type_t enc_type = ENC_NONE;
    1920char *key = NULL;
     21
     22
     23struct libtrace_t *trace = NULL;
     24
     25static void cleanup_signal(int signal)
     26{
     27        static int s = 0;
     28        (void)signal;
     29        // trace_interrupt();
     30        // trace_pstop isn't really signal safe because its got lots of locks in it
     31        // trace_pstop(trace);
     32        if (s == 0) {
     33                if (trace_ppause(trace) == -1)
     34                        trace_perror(trace, "Pause failed");
     35        }
     36        else {
     37                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
     38                        trace_perror(trace, "Start failed");
     39        }
     40        s = !s;
     41}
     42
     43
    2044
    2145static void usage(char *argv0)
     
    166190                //libtrace_packet_t * packet_copy = trace_copy_packet(packet);
    167191                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
    168                 trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
     192                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
     193                trace_publish_packet(trace, pkt);
    169194                //return ;
    170195        }
     
    181206int main(int argc, char *argv[])
    182207{
    183         struct libtrace_t *trace = 0;
     208        //struct libtrace_t *trace = 0;
    184209        struct libtrace_packet_t *packet/* = trace_create_packet()*/;
    185210        struct libtrace_out_t *writer = 0;
     211        struct sigaction sigact;
    186212        char *output = 0;
    187213        int level = -1;
     
    371397                return 1;
    372398        }
     399
     400        sigact.sa_handler = cleanup_signal;
     401        sigemptyset(&sigact.sa_mask);
     402        sigact.sa_flags = SA_RESTART;
     403
     404        sigaction(SIGINT, &sigact, NULL);
     405        sigaction(SIGTERM, &sigact, NULL);
    373406       
    374407        // Read in the resulting packets and then free them when done
  • tools/tracestats/tracestats_parallel.c

    r5ce14a5 rfac8c46  
    5959#include <pthread.h>
    6060
    61 struct libtrace_t *trace;
     61struct libtrace_t *trace = NULL;
    6262
    6363static void cleanup_signal(int signal)
    6464{
     65        static int s = 0;
    6566        (void)signal;
    66         //trace_interrupt();
     67        // trace_interrupt();
    6768        // trace_pstop isn't really signal safe because its got lots of locks in it
    68         trace_pstop(trace);
     69        // trace_pstop(trace);
     70        if (s == 0) {
     71                if (trace_ppause(trace) == -1)
     72                        trace_perror(trace, "Pause failed");
     73        }
     74        else {
     75                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
     76                        trace_perror(trace, "Start failed");
     77        }
     78        s = !s;
    6979}
    7080
     
    121131                        case MESSAGE_STOPPED:
    122132                                trace_publish_result(trace, 0, results); // Only ever using a single key 0
    123                                 fprintf(stderr, "Thread published resuslts WOWW \n");
     133                                fprintf(stderr, "Thread published resuslts WOWW\n");
    124134                                break;
    125135                        case MESSAGE_STARTED:
    126136                                results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
    127137                                break;
    128                         case MESSAGE_PAUSE:
     138                        case MESSAGE_DO_PAUSE:
    129139                                fprintf(stderr, "GOT Asked to pause ahh\n");
     140                                break;
     141                        case MESSAGE_PAUSING:
     142                                fprintf(stderr, "Thread is pausing\n");
     143                                break;
     144                        case MESSAGE_PAUSED:
     145                                fprintf(stderr, "Thread has paused\n");
    130146                                break;
    131147                }
     
    189205}
    190206
    191 static uint64_t rand_hash(libtrace_packet_t * pkt) {
     207static uint64_t rand_hash(libtrace_packet_t * pkt, void *data) {
    192208        return rand();
    193209}
    194210
    195 static uint64_t bad_hash(libtrace_packet_t * pkt) {
     211static uint64_t bad_hash(libtrace_packet_t * pkt, void *data) {
    196212        return 0;
    197213}
     
    212228        int option = 2;
    213229        //option = 10000;
    214         //trace_parallel_config(trace, TRACE_OPTION_USE_DEDICATED_HASHER, &option);
     230        trace_parallel_config(trace, TRACE_OPTION_USE_DEDICATED_HASHER, &option);
     231        trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
    215232        //trace_parallel_config(trace, TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER, &option);
    216233        option = 2;
Note: See TracChangeset for help on using the changeset viewer.