Changeset 348396b


Ignore:
Timestamp:
09/30/15 16:55:25 (6 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
259c314
Parents:
a151dda
Message:

Fix deadlock when reading parallel RT with a filter

The problem was that filtered packets were still consuming slots in the
buckets, but matching packets would only get released once parallel libtrace had
read a full batch (10 packets, by default). If the filter didn't match many
packets, the 200,000 packet slots would wrap around before a batch was
complete -- leaving us with nowhere to store new packets.

The fix was to allow the bucket structure to "reclaim" a slot if it is
released as soon as it is allocated, i.e. if the packet was filtered. This
means that only matching packets will consume slots and we can't run out
before we read a full batch.

Location:
lib
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • lib/data-struct/buckets.c

    rd391ce0 r348396b  
    150150        assert(bnode != NULL);
    151151
     152
    152153        /* Find the right slot */
    153154        if (id < bnode->startindex) {
     
    158159        assert(s < bnode->slots);
    159160        assert(bnode->released[s] != 0);
     161       
    160162
    161163        if (bnode->released[s] == 1) {
    162                 bnode->released[s] = 2;
     164                uint64_t previd = b->nextid - 1;
     165                if (b->nextid == 1)
     166                        previd = MAX_OUTSTANDING - 1;
     167
     168                if (bnode == b->node && id == previd) {
     169                        b->packets[id] = NULL;
     170                        b->nextid = previd;
     171                        bnode->released[s] = 0;
     172                } else {
     173                        bnode->released[s] = 2;
     174                }
    163175                bnode->activemembers -= 1;
    164176        }
    165 
    166177
    167178        while (libtrace_list_get_size(b->nodelist) > 1) {
     
    170181                front = *(libtrace_bucket_node_t **)lnode->data;
    171182
    172                 if (front->activemembers > 0)
     183                if (front->activemembers > 0) {
    173184                        break;
     185                }
    174186                if (front == b->node)
    175187                        break;
  • lib/trace.c

    rd391ce0 r348396b  
    855855                }
    856856
    857                 pthread_mutex_lock(&packet->trace->libtrace_lock);
    858                 if (packet->trace && packet->trace->last_packet == packet)
    859                         packet->trace->last_packet = NULL;
    860                 pthread_mutex_unlock(&packet->trace->libtrace_lock);
     857                if (packet->trace) {
     858                        pthread_mutex_lock(&packet->trace->libtrace_lock);
     859                        if (packet->trace->last_packet == packet)
     860                                packet->trace->last_packet = NULL;
     861                        pthread_mutex_unlock(&packet->trace->libtrace_lock);
     862                }
    861863
    862864                // No matter what we remove the header and link pointers
     
    873875                packet->hash = 0;
    874876                packet->order = 0;
     877                packet->srcbucket = NULL;
    875878        }
    876879}
     
    900903
    901904        if (libtrace->format->read_packet) {
     905                /* Finalise the packet, freeing any resources the format module
     906                 * may have allocated it and zeroing all data associated with it.
     907                 */
     908                if (packet->trace == libtrace)
     909                        trace_fin_packet(packet);
    902910                do {
    903911                        size_t ret;
    904912                        int filtret;
    905                         /* Finalise the packet, freeing any resources the format module
    906                          * may have allocated it and zeroing all data associated with it.
    907                          */
    908                         if (packet->trace == libtrace)
    909                                 trace_fin_packet(packet);
    910913                        /* Store the trace we are reading from into the packet opaque
    911914                         * structure */
     
    915918                                return ret;
    916919                        }
    917                         if (libtrace->filter) {
     920                        if (libtrace->filter) {
    918921                                /* If the filter doesn't match, read another
    919922                                 * packet
     
    924927                                        return ~0U;
    925928                                }
    926                                
     929
    927930                                if (filtret == 0) {
    928931                                        ++libtrace->filtered_packets;
    929                                         continue;
     932                                        trace_fin_packet(packet);
     933                                        continue;
    930934                                }
    931935                        }
     
    940944                        if (packet->trace == libtrace)
    941945                                libtrace->last_packet = packet;
     946
    942947                        return ret;
    943948                } while(1);
Note: See TracChangeset for help on using the changeset viewer.