Changeset 69ae5a9


Ignore:
Timestamp:
02/25/15 17:09:02 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
2adc1d0
Parents:
2e22196d
Message:

Refactor duplicated code into a seperate function
Fixes a bug where packets could be lost when pausing a delayed trace.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    r4338f97 r69ae5a9  
    394394 * Sends a packet to the user, expects either a valid packet or a TICK packet.
    395395 *
    396  * Note READ_MESSAGE will only be returned if tracetime is true.
    397  *
    398  * @brief dispatch_packet
    399  * @param trace
    400  * @param t
     396 * @param trace The trace
     397 * @param t The current thread
    401398 * @param packet A pointer to the packet storage, which may be set to null upon
    402399 *               return, or a packet to be finished.
    403400 * @return 0 is successful, otherwise if playing back in tracetime
    404401 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
     402 *
     403 * @note READ_MESSAGE will only be returned if tracetime is true.
    405404 */
    406405static inline int dispatch_packet(libtrace_t *trace,
     
    428427
    429428/**
     429 * Sends a batch of packets to the user, expects either a valid packet or a
     430 * TICK packet.
     431 *
     432 * @param trace The trace
     433 * @param t The current thread
     434 * @param packets [in,out] An array of packets, these may be null upon return
     435 * @param nb_packets The total number of packets in the list
     436 * @param empty [in,out] A pointer to an integer storing the first empty slot,
     437 * upon return this is updated
     438 * @param offset [in,out] The offset into the array, upon return this is updated
     439 * @return 0 is successful, otherwise if playing back in tracetime
     440 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
     441 *
     442 * @note READ_MESSAGE will only be returned if tracetime is true.
     443 */
     444static inline int dispatch_packets(libtrace_t *trace,
     445                                  libtrace_thread_t *t,
     446                                  libtrace_packet_t *packets[],
     447                                  int nb_packets, int *empty, int *offset,
     448                                  bool tracetime) {
     449        for (;*offset < nb_packets; ++*offset) {
     450                int ret;
     451                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
     452                if (ret == 0) {
     453                        /* Move full slots to front as we go */
     454                        if (packets[*offset]) {
     455                                if (*empty != *offset) {
     456                                        packets[*empty] = packets[*offset];
     457                                        packets[*offset] = NULL;
     458                                }
     459                                ++*empty;
     460                        }
     461                } else {
     462                        /* Break early */
     463                        assert(ret == READ_MESSAGE);
     464                        return READ_MESSAGE;
     465                }
     466        }
     467
     468        return 0;
     469}
     470
     471/**
    430472 * Pauses a per packet thread, messages will not be processed when the thread
    431473 * is paused.
     
    441483static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
    442484                                     libtrace_packet_t *packets[],
    443                                      int *nb_packets, int *empty, int *offset) {
     485                                     int nb_packets, int *empty, int *offset) {
    444486        libtrace_message_t message = {0};
    445487        libtrace_packet_t * packet = NULL;
     
    454496        /* First send those packets already read, as fast as possible
    455497         * This should never fail or check for messages etc. */
    456         for (;*offset < *nb_packets; ++*offset) {
    457                 ASSERT_RET(dispatch_packet(trace, t, &packets[*offset], false), == 0);
    458                 /* Move full slots to front as we go */
    459                 if (packets[*offset]) {
    460                         if (*empty != *offset) {
    461                                 packets[*empty] = packets[*offset];
    462                                 packets[*offset] = NULL;
    463                         }
    464                         ++*empty;
    465                 }
    466         }
     498        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
     499                                    offset, false), == 0);
    467500
    468501        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
     
    545578
    546579        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
    547         // Send a message to say we've started
    548 
    549         // Let the per_packet function know we have started
     580
     581        /* Let the per_packet function know we have started */
    550582        message.code = MESSAGE_STARTING;
    551583        message.sender = t;
     
    561593                        switch (message.code) {
    562594                                case MESSAGE_DO_PAUSE: // This is internal
    563                                         ret = trace_perpkt_thread_pause(trace, t, packets, &nb_packets, &empty, &offset);
     595                                        ret = trace_perpkt_thread_pause(trace, t,
     596                                              packets, nb_packets, &empty, &offset);
    564597                                        if (ret == READ_EOF) {
    565598                                                fprintf(stderr, "PAUSE stop eof!!\n");
     
    610643                                store_first_packet(trace, packets[0], t);
    611644                        }
    612                         for (;offset < nb_packets; ++offset) {
    613                                 int ret;
    614                                 ret = dispatch_packet(trace, t, &packets[offset], trace->tracetime);
    615                                 if (ret == 0) {
    616                                         /* Move full slots to front as we go */
    617                                         if (packets[offset]) {
    618                                                 if (empty != offset) {
    619                                                         packets[empty] = packets[offset];
    620                                                         packets[offset] = NULL;
    621                                                 }
    622                                                 ++empty;
    623                                         }
    624                                 } else {
    625                                         assert(ret == READ_MESSAGE);
    626                                         /* Loop around and process the message, note */
    627                                         continue;
    628                                 }
    629                         }
     645                        dispatch_packets(trace, t, packets, nb_packets, &empty,
     646                                         &offset, trace->tracetime);
    630647                } else {
    631648                        switch (nb_packets) {
     
    672689                }
    673690        }
    674 
    675691
    676692        thread_change_state(trace, t, THREAD_FINISHED, true);
     
    865881        /* Read nb_packets */
    866882        for (i = 0; i < nb_packets; ++i) {
     883                if (libtrace_halt) {
     884                        break;
     885                }
    867886                packets[i]->error = trace_read_packet(libtrace, packets[i]);
    868887
Note: See TracChangeset for help on using the changeset viewer.