Changeset 858ce90


Ignore:
Timestamp:
02/05/15 19:39:31 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
12ae766
Parents:
368a1ae
Message:

Refactoring the packet loop

Removes the pread type selection function and replaces it with a
function pointer created when the format is started.

Tidies the per packet loop, removes duplicate code and rewrites
tracetime-delaying so that a message can be processed while waiting
for the delay to return within a batch of packets.

In this case if a pause message is encountered we will first
notify the trace that we are pausing. Then send any remaining
packets from the batch without delay, and then properly pause. The resume
message is sent to the per packet thread before normal packet
flow continues.

This still is WIP and contains alot of debugging code. The error
path still needs to be better implemented

Location:
lib
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/libtrace_int.h

    r368a1ae r858ce90  
    308308        fn_hasher hasher; // If valid using a separate thread
    309309        void *hasher_data;
    310        
     310        /** The pread_packet choosen path for the configuration */
     311        int (*pread)(libtrace_t *, libtrace_thread_t *, libtrace_packet_t **, size_t);
     312
    311313        libtrace_thread_t hasher_thread;
    312314        libtrace_thread_t reporter_thread;
  • lib/trace.c

    r368a1ae r858ce90  
    282282        libtrace->dropped_packets = UINT64_MAX;
    283283        libtrace->received_packets = UINT64_MAX;
     284        libtrace->pread = NULL;
    284285        ZERO_USER_CONFIG(libtrace->config);
    285286
     
    396397        libtrace->perpkt_threads = NULL;
    397398        libtrace->tracetime = 0;
     399        libtrace->pread = NULL;
    398400        ZERO_USER_CONFIG(libtrace->config);
    399401       
  • lib/trace_parallel.c

    r368a1ae r858ce90  
    101101#include <unistd.h>
    102102
    103 
    104 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets);
    105 
     103static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
    106104extern int libtrace_parallel;
    107105
     
    226224
    227225        if (trace->config.debug_state)
    228                 fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
    229                         prev_state, t->state);
     226                fprintf(stderr, "Thread %d state changed from %d to %d\n",
     227                        (int) t->tid, prev_state, t->state);
    230228
    231229        pthread_cond_broadcast(&trace->perpkt_cond);
     
    388386}
    389387
    390 
    391 
    392 /**
    393  * Dispatches packets to their correct place and applies any translations
    394  * as needed.
    395  *
     388/**
     389 * Sends a packet to the user, expects either a valid packet or a TICK packet.
     390 *
     391 * Note READ_MESSAGE will only be returned if tracetime is true.
     392 *
     393 * @brief dispatch_packet
    396394 * @param trace
    397395 * @param t
    398  * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse
    399  * @return -1 if an error or EOF has occured and the trace should end, otherwise a postive number (or 0)
    400  * representing the number of packets returned, these will be at the beginning of the array.
    401  */
    402 static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets,
    403                                    size_t nb_packets) {
    404         libtrace_message_t message;
    405         size_t i, empty = 0;
    406         for (i = 0; i < nb_packets; ++i) {
    407                 if (packets[i]->error > 0) {
    408                         packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
    409                         trace_fin_packet(packets[i]);
    410                 } else if (packets[i]->error == READ_TICK) {
    411                         message.code = MESSAGE_TICK;
    412                         message.additional.uint64 = trace_packet_get_order(packets[i]);
    413                         message.sender = t;
    414                         (*trace->per_pkt)(trace, NULL, &message, t);
    415                 } else if (packets[i]->error != READ_MESSAGE) {
    416                         // An error this should be the last packet we read
    417                         size_t z;
    418                         // We could have an eof or error and a message such as pause
    419                         for (z = i + 1 ; z < nb_packets; ++z) {
    420                                 fprintf(stderr, "i=%d nb_packets=%d err=%d, seq=%d\n", (int) z, (int) nb_packets, packets[z]->error, (int) packets[z]->order);
    421                                 assert (packets[z]->error <= 0);
    422                         }
    423                         return -1;
    424                 }
    425                 if (packets[i]) {
    426                         // Move full slots to front
    427                         if (empty != i) {
    428                                 packets[empty] = packets[i];
    429                                 packets[i] = NULL;
    430                         }
    431                         ++empty;
    432                         // Finish packets while still in CPU cache
    433                 }
    434         }
    435         return empty;
    436 }
    437 
    438 static inline int dispatch_packet(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packet) {
    439         libtrace_message_t message;
     396 * @param packet A pointer to the packet storage, which may be set to null upon
     397 *               return, or a packet to be finished.
     398 * @return 0 is successful, otherwise if playing back in tracetime
     399 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
     400 */
     401static inline int dispatch_packet(libtrace_t *trace,
     402                                                 libtrace_thread_t *t,
     403                                                 libtrace_packet_t **packet,
     404                                                 bool tracetime) {
    440405        if ((*packet)->error > 0) {
     406                if (tracetime) {
     407                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
     408                                return READ_MESSAGE;
     409                }
    441410                *packet = (*trace->per_pkt)(trace, *packet, NULL, t);
    442411                trace_fin_packet(*packet);
    443         } else if ((*packet)->error == READ_TICK) {
     412        } else {
     413                libtrace_message_t message;
     414                assert((*packet)->error == READ_TICK);
    444415                message.code = MESSAGE_TICK;
    445416                message.additional.uint64 = trace_packet_get_order(*packet);
    446417                message.sender = t;
    447418                (*trace->per_pkt)(trace, NULL, &message, t);
    448         } else if ((*packet)->error != READ_MESSAGE) {
    449                 return -1;
    450419        }
    451420        return 0;
     421}
     422
     423/**
     424 * Pauses a per packet thread, messages will not be processed when the thread
     425 * is paused.
     426 *
     427 * This process involves reading packets if a hasher thread is used. As such
     428 * this function can fail to pause due to errors when reading in which case
     429 * the thread should be stopped instead.
     430 *
     431 *
     432 * @brief trace_perpkt_thread_pause
     433 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
     434 */
     435static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
     436                                     libtrace_packet_t *packets[],
     437                                     int *nb_packets, int *empty, int *offset) {
     438        libtrace_message_t message = {0};
     439        libtrace_packet_t * packet = NULL;
     440
     441        /* Let the user thread know we are going to pause */
     442        message.code = MESSAGE_PAUSING;
     443        message.sender = t;
     444        (*trace->per_pkt)(trace, NULL, &message, t);
     445
     446        /* Send through any remaining packets (or messages) without delay */
     447
     448        /* First send those packets already read, as fast as possible
     449         * This should never fail or check for messages etc. */
     450        for (;*offset < *nb_packets; ++*offset) {
     451                ASSERT_RET(dispatch_packet(trace, t, &packets[*offset], false), == 0);
     452                /* Move full slots to front as we go */
     453                if (packets[*offset]) {
     454                        if (*empty != *offset) {
     455                                packets[*empty] = packets[*offset];
     456                                packets[*offset] = NULL;
     457                        }
     458                        ++*empty;
     459                }
     460        }
     461
     462        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
     463        /* If a hasher thread is running, empty input queues so we don't lose data */
     464        if (trace_has_dedicated_hasher(trace)) {
     465                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
     466                // The hasher has stopped by this point, so the queue shouldn't be filling
     467                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
     468                        int ret = trace->pread(trace, t, &packet, 1);
     469                        if (ret == 1) {
     470                                if (packet->error > 0) {
     471                                        store_first_packet(trace, packet, t);
     472                                }
     473                                ASSERT_RET(dispatch_packet(trace, t, &packet, 1), == 0);
     474                                if (packet == NULL)
     475                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
     476                        } else if (ret != READ_MESSAGE) {
     477                                /* Ignore messages we pick these up next loop */
     478                                assert (ret == READ_EOF || ret == READ_ERROR);
     479                                /* Verify no packets are remaining */
     480                                /* TODO refactor this sanity check out!! */
     481                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
     482                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
     483                                        // No packets after this should have any data in them
     484                                        assert(packet->error <= 0);
     485                                }
     486                                fprintf(stderr, "PREAD_FAILED %d\n", ret);
     487                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
     488                                return -1;
     489                        }
     490                }
     491        }
     492        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
     493
     494        /* Now we do the actual pause, this returns when we resumed */
     495        trace_thread_pause(trace, t);
     496        message.code = MESSAGE_RESUMING;
     497        (*trace->per_pkt)(trace, NULL, &message, t);
     498        return 1;
    452499}
    453500
     
    457504static void* perpkt_threads_entry(void *data) {
    458505        libtrace_t *trace = (libtrace_t *)data;
    459         libtrace_thread_t * t;
     506        libtrace_thread_t *t;
    460507        libtrace_message_t message = {0};
    461508        libtrace_packet_t *packets[trace->config.burst_size];
    462         size_t nb_packets;
    463509        size_t i;
    464         int ret;
     510        //int ret;
     511        /* The current reading position into the packets */
     512        int offset = 0;
     513        /* The number of packets last read */
     514        int nb_packets = 0;
     515        /* The offset to the first NULL packet upto offset */
     516        int empty = 0;
    465517
    466518        /* Wait until trace_pstart has been completed */
     
    499551
    500552                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
     553                        int ret;
    501554                        switch (message.code) {
    502555                                case MESSAGE_DO_PAUSE: // This is internal
    503                                         // Send message to say we are pausing, TODO consider sender
    504                                         message.code = MESSAGE_PAUSING;
    505                                         message.sender = t;
    506                                         (*trace->per_pkt)(trace, NULL, &message, t);
    507                                         // If a hasher thread is running empty input queues so we don't lose data
    508                                         if (trace_has_dedicated_hasher(trace)) {
    509                                                 fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
    510                                                 // The hasher has stopped by this point, so the queue shouldn't be filling
    511                                                 while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    512                                                         ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
    513                                                         if (dispatch_packets(trace, t, packets, 1) == -1) {
    514                                                                 // EOF or error, either way we'll stop
    515                                                                 while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    516                                                                         ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
    517                                                                         // No packets after this should have any data in them
    518                                                                         assert(packets[0]->error <= 0);
    519                                                                 }
    520                                                                 goto stop;
    521                                                         }
    522                                                 }
     556                                        ret = trace_perpkt_thread_pause(trace, t, packets, &nb_packets, &empty, &offset);
     557                                        if (ret == READ_EOF) {
     558                                                fprintf(stderr, "PAUSE stop eof!!\n");
     559                                                goto eof;
     560                                        } else if (ret == READ_ERROR) {
     561                                                fprintf(stderr, "PAUSE stop error!!\n");
     562                                                goto error;
    523563                                        }
    524                                         // Now we do the actual pause, this returns when we are done
    525                                         trace_thread_pause(trace, t);
    526                                         message.code = MESSAGE_RESUMING;
    527                                         (*trace->per_pkt)(trace, NULL, &message, t);
    528                                         // Check for new messages as soon as we return
     564                                        assert(ret == 1);
    529565                                        continue;
    530566                                case MESSAGE_DO_STOP: // This is internal
    531                                         goto stop;
     567                                        fprintf(stderr, "DO_STOP stop!!\n");
     568                                        goto eof;
    532569                        }
    533570                        (*trace->per_pkt)(trace, NULL, &message, t);
     571                        /* Continue and the empty messages out before packets */
    534572                        continue;
    535573                }
    536574
    537                 if (trace->perpkt_thread_count == 1) {
    538                         assert(packets[0]);
    539                         packets[0]->error = trace_read_packet(trace, packets[0]);
    540                         if (dispatch_packet(trace, t, &packets[0]) != 0)
    541                                 break;
    542                         if (!packets[0]) {
    543                                 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1);
     575
     576                /* Do we need to read a new set of packets MOST LIKELY we do */
     577                if (offset == nb_packets) {
     578                        /* Refill the packet buffer */
     579                        if (empty != nb_packets) {
     580                                // Refill the empty packets
     581                                libtrace_ocache_alloc(&trace->packet_freelist,
     582                                                      (void **) &packets[empty],
     583                                                      nb_packets - empty,
     584                                                      nb_packets - empty);
     585                        }
     586                        if (!trace->pread) {
     587                                assert(packets[0]);
     588                                nb_packets = trace_read_packet(trace, packets[0]);
     589                                packets[0]->error = nb_packets;
     590                                if (nb_packets > 0)
     591                                        nb_packets = 1;
     592                        } else {
     593                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
     594                        }
     595                        offset = 0;
     596                        empty = 0;
     597                }
     598
     599                /* Handle error/message cases */
     600                if (nb_packets > 0) {
     601                        /* Store the first packet */
     602                        if (packets[0]->error > 0) {
     603                                store_first_packet(trace, packets[0], t);
     604                        }
     605                        for (;offset < nb_packets; ++offset) {
     606                                int ret;
     607                                ret = dispatch_packet(trace, t, &packets[offset], trace->tracetime);
     608                                if (ret == 0) {
     609                                        /* Move full slots to front as we go */
     610                                        if (packets[offset]) {
     611                                                if (empty != offset) {
     612                                                        packets[empty] = packets[offset];
     613                                                        packets[offset] = NULL;
     614                                                }
     615                                                ++empty;
     616                                        }
     617                                } else {
     618                                        assert(ret == READ_MESSAGE);
     619                                        /* Loop around and process the message, note */
     620                                        continue;
     621                                }
    544622                        }
    545623                } else {
    546                         nb_packets = trace_pread_packet(trace, t, packets, trace->config.burst_size);
    547                         // Loop through the packets we just read and refill
    548                         ret = dispatch_packets(trace, t, packets, nb_packets);
    549                         if (ret == -1)
    550                                 break;
    551                         else if (ret != nb_packets) {
    552                                 // Refill the empty packets
    553                                 //printf("Refilling packets ret=%d nb_packets=%zd\n", ret, nb_packets);
    554                                 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[ret], nb_packets - ret, nb_packets - ret);
     624                        switch (nb_packets) {
     625                        case READ_EOF:
     626                                fprintf(stderr, "EOF stop %d!!\n", nb_packets);
     627                                goto eof;
     628                        case READ_ERROR:
     629                                fprintf(stderr, "ERROR stop %d!!\n", nb_packets);
     630                                goto error;
     631                        case READ_MESSAGE:
     632                                nb_packets = 0;
     633                                continue;
     634                        default:
     635                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
     636                                goto error;
    555637                        }
    556638                }
    557         }
    558 
    559 
    560 stop:
     639
     640        }
     641
     642error:
     643        fprintf(stderr, "An error occured in trace\n");
     644eof:
     645        fprintf(stderr, "An eof occured in trace\n");
    561646        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
    562647
     
    751836}
    752837
    753 /**
    754  * @brief Move NULLs to the end of an array.
    755  * @param values
    756  * @param len
    757  * @return The location the first NULL, aka the number of non NULL elements
    758  */
    759 static inline size_t move_nulls_back(void *arr[], size_t len) {
    760         size_t fr=0, en = len-1;
    761         // Shift all non NULL elements to the front of the array, and NULLs to the
    762         // end, traverses every element at most once
    763         for (;fr < en; ++fr) {
    764                 if (arr[fr] == NULL) {
    765                         for (;en > fr; --en) {
    766                                 if(arr[en]) {
    767                                         arr[fr] = arr[en];
    768                                         arr[en] = NULL;
    769                                         break;
    770                                 }
    771                         }
    772                 }
    773         }
    774         // This is the index of the first NULL
    775         en = MIN(fr, en);
    776         // Or the end of the array if this special case
    777         if (arr[en])
    778                 en++;
    779         return en;
    780 }
    781 
    782 /** returns the number of packets successfully allocated in the final array
    783  these will all be at the front of the array */
    784 inline static size_t fill_array_with_empty_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
    785         size_t nb;
    786         nb = move_nulls_back((void **) packets, nb_packets);
    787         mem_hits.read.recycled += nb;
    788         nb += libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[nb], nb_packets - nb, nb_packets - nb);
    789         assert(nb_packets == nb);
    790         return nb;
    791 }
    792 
    793 
    794 inline static size_t empty_array_of_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
    795         size_t nb;
    796         nb = move_nulls_back((void **) packets, nb_packets);
    797         mem_hits.write.recycled += nb_packets - nb;
    798         nb += nb_packets - libtrace_ocache_free(&libtrace->packet_freelist, (void **)packets, nb, nb);
    799         memset(packets, 0, nb); // XXX make better, maybe do this in ocache??
    800         return nb;
    801 }
    802 
    803838/* Our simplest case when a thread becomes ready it can obtain an exclusive
    804839 * lock to read packets from the underlying trace.
    805840 */
    806 inline static size_t trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
    807 {
     841static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
     842                                                    libtrace_thread_t *t,
     843                                                    libtrace_packet_t *packets[],
     844                                                    size_t nb_packets) {
    808845        size_t i = 0;
    809846        //bool tick_hit = false;
     
    813850        for (i = 0; i < nb_packets; ++i) {
    814851                packets[i]->error = trace_read_packet(libtrace, packets[i]);
     852
    815853                if (packets[i]->error <= 0) {
    816                         ++i;
    817                         break;
     854                        /* We'll catch this next time if we have already got packets */
     855                        if ( i==0 ) {
     856                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     857                                return packets[i]->error;
     858                        } else {
     859                                break;
     860                        }
    818861                }
    819862                /*
     
    843886 * 2. Move that into the packet provided (packet)
    844887 */
    845 inline static size_t trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets)
    846 {
     888inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
     889                                                      libtrace_thread_t *t,
     890                                                      libtrace_packet_t *packets[],
     891                                                      size_t nb_packets) {
    847892        size_t i;
     893
     894        /* We store the last error message here */
     895        if (t->format_data) {
     896                fprintf(stderr, "Hit me, ohh yeah got error %d\n",
     897                        ((libtrace_packet_t *)t->format_data)->error);
     898                return ((libtrace_packet_t *)t->format_data)->error;
     899        }
    848900
    849901        // Always grab at least one
     
    852904        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
    853905
    854         if (packets[0]->error < 0)
    855                 return 1;
     906        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
     907                fprintf(stderr, "Hit me, ohh yeah returning error %d\n", packets[0]->error);
     908                return packets[0]->error;
     909        }
    856910
    857911        for (i = 1; i < nb_packets; i++) {
     
    862916                        break;
    863917                }
    864                 // These are typically urgent
    865                 if (packets[i]->error < 0)
     918
     919                /* We will return an error or EOF the next time around */
     920                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
     921                        /* The message case will be checked automatically -
     922                           However other cases like EOF and error will only be
     923                           sent once*/
     924                        if (packets[i]->error != READ_MESSAGE) {
     925                                assert(t->format_data == NULL);
     926                                t->format_data = packets[i];
     927                                fprintf(stderr, "Hit me, ohh yeah set error %d\n",
     928                                        ((libtrace_packet_t *)t->format_data)->error);
     929                        }
    866930                        break;
     931                }
    867932        }
    868933
     
    12231288
    12241289/**
    1225  * Delays a packets playback so the playback will be in trace time
    1226  */
    1227 static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
     1290 * Delays a packets playback so the playback will be in trace time.
     1291 * This may break early if a message becomes available.
     1292 *
     1293 * Requires the first packet for this thread to be received.
     1294 * @param libtrace  The trace
     1295 * @param packet    The packet to delay
     1296 * @param t         The current thread
     1297 * @return Either READ_MESSAGE(-2) or 0 is successful
     1298 */
     1299static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
    12281300        struct timeval curr_tv, pkt_tv;
    1229         uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
     1301        uint64_t next_release = t->tracetime_offset_usec;
    12301302        uint64_t curr_usec;
    1231         /* Tracetime we might delay releasing this packet */
     1303
    12321304        if (!t->tracetime_offset_usec) {
    1233                 libtrace_packet_t * first_pkt;
     1305                libtrace_packet_t *first_pkt;
    12341306                struct timeval *sys_tv;
    12351307                int64_t initial_offset;
     
    12381310                pkt_tv = trace_get_timeval(first_pkt);
    12391311                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
     1312                /* In the unlikely case offset is 0, change it to 1 */
    12401313                if (stable)
    1241                         // 0->1 because 0 is used to mean unset
    12421314                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
    12431315                next_release = initial_offset;
     
    12491321        curr_usec = tv_to_usec(&curr_tv);
    12501322        if (next_release > curr_usec) {
     1323                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
     1324                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
     1325                fd_set rfds;
     1326                FD_ZERO(&rfds);
     1327                FD_SET(mesg_fd, &rfds);
    12511328                // We need to wait
    1252                 struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
     1329
    12531330                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
    1254                 select(0, NULL, NULL, NULL, &delay_tv);
    1255         }
     1331                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
     1332                if (ret == 0) {
     1333                        return 0;
     1334                } else if (ret > 0) {
     1335                        return READ_MESSAGE;
     1336                } else {
     1337                        fprintf(stderr, "I thnik we broke select\n");
     1338                }
     1339        }
     1340        return 0;
    12561341}
    12571342
     
    13561441}
    13571442
    1358 /**
    1359  * Selects the correct source for packets, either a parallel source
    1360  * or internal splitting
    1361  *
    1362  * @param libtrace
    1363  * @param t
    1364  * @param packets An array pre-filled with empty finilised packets
    1365  * @param nb_packets The number of packets in the array
    1366  *
    1367  * @return the number of packets read, null packets indicate messages. Check packet->error before
    1368  * assuming a packet is valid.
    1369  */
    1370 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
    1371                                  libtrace_packet_t *packets[], size_t nb_packets)
    1372 {
    1373         size_t ret;
    1374         size_t i;
    1375         assert(nb_packets);
    1376 
    1377         if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    1378                 ret = trace_pread_packet_wrapper(libtrace, t, packets, nb_packets);
    1379                 /* Put the error into the first packet */
    1380                 if ((int) ret <= 0) {
    1381                         packets[0]->error = ret;
    1382                         ret = 1;
    1383                 }
    1384         } else if (trace_has_dedicated_hasher(libtrace)) {
    1385                 ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets);
    1386         } else if (!trace_has_dedicated_hasher(libtrace)) {
    1387                 /* We don't care about which core a packet goes to */
    1388                 ret = trace_pread_packet_first_in_first_served(libtrace, t, packets, nb_packets);
    1389         } /* else {
    1390                 ret = trace_pread_packet_hash_locked(libtrace, packet);
    1391         }*/
    1392 
    1393         // Formats can also optionally do this internally to ensure the first
    1394         // packet is always reported correctly
    1395         assert(ret);
    1396         assert(ret <= nb_packets);
    1397         if (packets[0]->error > 0) {
    1398                 store_first_packet(libtrace, packets[0], t);
    1399                 if (libtrace->tracetime)
    1400                         delay_tracetime(libtrace, packets[0], t);
    1401         }
    1402 
    1403         return ret;
    1404 }
    1405 
    14061443/* Restarts a parallel trace, this is called from trace_pstart.
    14071444 * The libtrace lock is held upon calling this function.
     
    15951632                printf("This format has direct support for p's\n");
    15961633                ret = libtrace->format->pstart_input(libtrace);
     1634                libtrace->pread = trace_pread_packet_wrapper;
    15971635        } else {
    15981636                if (libtrace->format->start_input) {
    15991637                        ret = libtrace->format->start_input(libtrace);
    16001638                }
     1639                if (libtrace->perpkt_thread_count > 1)
     1640                        libtrace->pread = trace_pread_packet_first_in_first_served;
     1641                else
     1642                        libtrace->pread = NULL;
    16011643        }
    16021644
     
    16231665                        goto cleanup_started;
    16241666                }
     1667                libtrace->pread = trace_pread_packet_hasher_thread;
    16251668        } else {
    16261669                libtrace->hasher_thread.type = THREAD_EMPTY;
Note: See TracChangeset for help on using the changeset viewer.