Ignore:
Timestamp:
11/27/14 14:19:19 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
d7fd648
Parents:
21f5f0f
Message:

Fixes bug where packets could be destroyed by unregistered threads
We now destroy the packet object cache thread caches before unresistering a thread

Also includes some whitespace fixes.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    r78911b5 r6e41e73  
    203203/**
    204204 * Changes a thread's state and broadcasts the condition variable. This
    205  * should always be done when the lock is held. 
     205 * should always be done when the lock is held.
    206206 *
    207207 * Additionally for perpkt threads the state counts are updated.
     
    237237/**
    238238 * Changes the overall traces state and signals the condition.
    239  * 
     239 *
    240240 * @param trace A pointer to the trace
    241241 * @param new_state The new state of the trace
     
    243243 *        false in the case the lock is currently held by this thread.
    244244 */
    245 static inline void libtrace_change_state(libtrace_t *trace, 
     245static inline void libtrace_change_state(libtrace_t *trace,
    246246        const enum trace_state new_state, const bool need_lock)
    247247{
     
    530530        }
    531531
    532        
     532
    533533        thread_change_state(trace, t, THREAD_FINISHED, true);
    534534
     
    538538        trace_send_message_to_reporter(trace, &message);
    539539
     540        // Release all ocache memory before unregistering with the format
     541        // because this might(it does in DPDK) unlink the formats mempool
     542        // causing destroy/finish packet to fail.
     543        libtrace_ocache_unregister_thread(&trace->packet_freelist);
    540544        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    541545        if (trace->format->punregister_thread) {
     
    665669        message.additional.uint64 = 0;
    666670        trace_send_message_to_reporter(trace, &message);
     671        libtrace_ocache_unregister_thread(&trace->packet_freelist);
    667672        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    668673        if (trace->format->punregister_thread) {
     
    810815                        break;
    811816        }
    812        
     817
    813818        return i;
    814819}
     
    12581263                        // Wait for timeout or a message
    12591264                        FD_ZERO(&rfds);
    1260                 FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
     1265                        FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
    12611266                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
    12621267                                libtrace_message_t msg;
     
    13661371                                                libtrace->snaplen);
    13671372                        }
    1368                        
     1373
    13691374                        ++t->accepted_packets;
    13701375                        // TODO look into this better
     
    14551460                return -1;
    14561461        }
    1457        
     1462
    14581463        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
    14591464        if (libtrace->state != STATE_NEW) {
     
    14661471                        return -1;
    14671472                }
    1468                
     1473
    14691474                // Update the per_pkt function, or reuse the old one
    14701475                if (per_pkt)
     
    14771482                assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
    14781483                assert(libtrace->per_pkt);
    1479                
     1484
    14801485                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    14811486                        fprintf(stderr, "Restarting trace pstart_input()\n");
     
    14871492                        }
    14881493                }
    1489                
     1494
    14901495                if (err == 0) {
    14911496                        libtrace->started = true;
     
    16331638        if (libtrace->config.tick_interval > 0) {
    16341639                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
    1635                 libtrace->keepalive_thread.state = THREAD_RUNNING;
     1640                libtrace->keepalive_thread.state = THREAD_RUNNING;
    16361641                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
    16371642                ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0);
     
    16611666/**
    16621667 * Pauses a trace, this should only be called by the main thread
    1663  * 1. Set started = false 
     1668 * 1. Set started = false
    16641669 * 2. All perpkt threads are paused waiting on a condition var
    16651670 * 3. Then call ppause on the underlying format if found
     
    16741679        int i;
    16751680        assert(libtrace);
    1676        
     1681
    16771682        t = get_thread_table(libtrace);
    16781683        // Check state from within the lock if we are going to change it
     
    18101815        // Now send a message asking the threads to stop
    18111816        // This will be retrieved before trying to read another packet
    1812        
     1817
    18131818        message.code = MESSAGE_DO_STOP;
    18141819        trace_send_message_to_perpkts(libtrace, &message);
    18151820        if (trace_has_dedicated_hasher(libtrace))
    18161821                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
    1817        
     1822
    18181823        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    18191824                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
     
    19331938                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
    19341939        }
    1935        
     1940
    19361941        // Wait for the tick (keepalive) thread if it has been started
    19371942        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
     
    19411946                pthread_join(libtrace->keepalive_thread.tid, NULL);
    19421947        }
    1943        
     1948
    19441949        libtrace_change_state(libtrace, STATE_JOINED, true);
    19451950        print_memory_stats();
     
    21532158        assert(uc);
    21542159        if (strncmp(key, "packet_cache_size", nkey) == 0
    2155                 || strncmp(key, "pcs", nkey) == 0) {
     2160            || strncmp(key, "pcs", nkey) == 0) {
    21562161                uc->packet_cache_size = strtoll(value, NULL, 10);
    21572162        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
    2158                            || strncmp(key, "ptcs", nkey) == 0) {
     2163                   || strncmp(key, "ptcs", nkey) == 0) {
    21592164                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
    21602165        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
    2161                   || strncmp(key, "fpc", nkey) == 0) {
     2166                   || strncmp(key, "fpc", nkey) == 0) {
    21622167                uc->fixed_packet_count = config_bool_parse(value, nvalue);
    21632168        } else if (strncmp(key, "burst_size", nkey) == 0
Note: See TracChangeset for help on using the changeset viewer.