Changeset 6e41e73


Ignore:
Timestamp:
11/27/14 14:19:19 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
d7fd648
Parents:
21f5f0f
Message:

Fixes bug where packets could be destroyed by unregistered threads
We now destroy the packet object cache thread caches before unresistering a thread

Also includes some whitespace fixes.

Location:
lib
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/data-struct/object_cache.c

    r78911b5 r6e41e73  
    1 
    21#include "object_cache.h"
    32#include <assert.h>
     
    3433
    3534/**
    36  * @brief free_array assumes we DONT hold spin
     35 * @brief unregister_thread assumes we DONT hold spin
    3736 */
    3837static inline void unregister_thread(struct local_cache *lc) {
     
    7170
    7271/**
    73  * @brief free_array assumes we hold spin!!!
     72 * @brief register_thread assumes we DONT hold spin
    7473 */
    7574static inline void register_thread(libtrace_ocache_t *oc, struct local_cache *lc) {
     
    429428}
    430429
     430/**
     431 * @brief ocache_unregister_thread removes a thread from an ocache.
     432 * @param The ocache to remove this thread, this will free any packets in the TLS cache
     433 */
     434DLLEXPORT void libtrace_ocache_unregister_thread(libtrace_ocache_t *oc) {
     435        size_t i;
     436        struct local_cache *lc = find_cache(oc);
     437
     438        if (lc) {
     439                for (i = 0; i < t_mem_caches_used; ++i) {
     440                        if (&t_mem_caches[i] == lc) {
     441                                // Free the cache against the ocache
     442                                unregister_thread(&t_mem_caches[i]);
     443                                free(t_mem_caches[i].cache);
     444                                // And remove it from the thread itself
     445                                --t_mem_caches_used;
     446                                t_mem_caches[i] = t_mem_caches[t_mem_caches_used];
     447                                memset(&t_mem_caches[t_mem_caches_used], 0, sizeof(struct local_cache));
     448                        }
     449                }
     450        }
     451}
  • lib/data-struct/object_cache.h

    ra49a9eb r6e41e73  
    2121
    2222DLLEXPORT void libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void), void (*free)(void*),
    23                                                                           size_t thread_cache_size, size_t buffer_size, bool limit_size);
     23                                    size_t thread_cache_size, size_t buffer_size, bool limit_size);
    2424DLLEXPORT int libtrace_ocache_destroy(libtrace_ocache_t *oc);
    2525DLLEXPORT size_t libtrace_ocache_alloc(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers);
    2626DLLEXPORT size_t libtrace_ocache_free(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers);
    2727DLLEXPORT void libtrace_zero_ocache(libtrace_ocache_t *oc);
     28DLLEXPORT void libtrace_ocache_unregister_thread(libtrace_ocache_t *oc);
    2829#endif // LIBTRACE_OBJECT_CACHE_H
    29 
    30 
  • lib/trace_parallel.c

    r78911b5 r6e41e73  
    203203/**
    204204 * Changes a thread's state and broadcasts the condition variable. This
    205  * should always be done when the lock is held. 
     205 * should always be done when the lock is held.
    206206 *
    207207 * Additionally for perpkt threads the state counts are updated.
     
    237237/**
    238238 * Changes the overall traces state and signals the condition.
    239  * 
     239 *
    240240 * @param trace A pointer to the trace
    241241 * @param new_state The new state of the trace
     
    243243 *        false in the case the lock is currently held by this thread.
    244244 */
    245 static inline void libtrace_change_state(libtrace_t *trace, 
     245static inline void libtrace_change_state(libtrace_t *trace,
    246246        const enum trace_state new_state, const bool need_lock)
    247247{
     
    530530        }
    531531
    532        
     532
    533533        thread_change_state(trace, t, THREAD_FINISHED, true);
    534534
     
    538538        trace_send_message_to_reporter(trace, &message);
    539539
     540        // Release all ocache memory before unregistering with the format
     541        // because this might(it does in DPDK) unlink the formats mempool
     542        // causing destroy/finish packet to fail.
     543        libtrace_ocache_unregister_thread(&trace->packet_freelist);
    540544        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    541545        if (trace->format->punregister_thread) {
     
    665669        message.additional.uint64 = 0;
    666670        trace_send_message_to_reporter(trace, &message);
     671        libtrace_ocache_unregister_thread(&trace->packet_freelist);
    667672        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    668673        if (trace->format->punregister_thread) {
     
    810815                        break;
    811816        }
    812        
     817
    813818        return i;
    814819}
     
    12581263                        // Wait for timeout or a message
    12591264                        FD_ZERO(&rfds);
    1260                 FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
     1265                        FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
    12611266                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
    12621267                                libtrace_message_t msg;
     
    13661371                                                libtrace->snaplen);
    13671372                        }
    1368                        
     1373
    13691374                        ++t->accepted_packets;
    13701375                        // TODO look into this better
     
    14551460                return -1;
    14561461        }
    1457        
     1462
    14581463        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
    14591464        if (libtrace->state != STATE_NEW) {
     
    14661471                        return -1;
    14671472                }
    1468                
     1473
    14691474                // Update the per_pkt function, or reuse the old one
    14701475                if (per_pkt)
     
    14771482                assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
    14781483                assert(libtrace->per_pkt);
    1479                
     1484
    14801485                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    14811486                        fprintf(stderr, "Restarting trace pstart_input()\n");
     
    14871492                        }
    14881493                }
    1489                
     1494
    14901495                if (err == 0) {
    14911496                        libtrace->started = true;
     
    16331638        if (libtrace->config.tick_interval > 0) {
    16341639                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
    1635                 libtrace->keepalive_thread.state = THREAD_RUNNING;
     1640                libtrace->keepalive_thread.state = THREAD_RUNNING;
    16361641                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
    16371642                ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0);
     
    16611666/**
    16621667 * Pauses a trace, this should only be called by the main thread
    1663  * 1. Set started = false 
     1668 * 1. Set started = false
    16641669 * 2. All perpkt threads are paused waiting on a condition var
    16651670 * 3. Then call ppause on the underlying format if found
     
    16741679        int i;
    16751680        assert(libtrace);
    1676        
     1681
    16771682        t = get_thread_table(libtrace);
    16781683        // Check state from within the lock if we are going to change it
     
    18101815        // Now send a message asking the threads to stop
    18111816        // This will be retrieved before trying to read another packet
    1812        
     1817
    18131818        message.code = MESSAGE_DO_STOP;
    18141819        trace_send_message_to_perpkts(libtrace, &message);
    18151820        if (trace_has_dedicated_hasher(libtrace))
    18161821                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
    1817        
     1822
    18181823        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    18191824                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
     
    19331938                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
    19341939        }
    1935        
     1940
    19361941        // Wait for the tick (keepalive) thread if it has been started
    19371942        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
     
    19411946                pthread_join(libtrace->keepalive_thread.tid, NULL);
    19421947        }
    1943        
     1948
    19441949        libtrace_change_state(libtrace, STATE_JOINED, true);
    19451950        print_memory_stats();
     
    21532158        assert(uc);
    21542159        if (strncmp(key, "packet_cache_size", nkey) == 0
    2155                 || strncmp(key, "pcs", nkey) == 0) {
     2160            || strncmp(key, "pcs", nkey) == 0) {
    21562161                uc->packet_cache_size = strtoll(value, NULL, 10);
    21572162        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
    2158                            || strncmp(key, "ptcs", nkey) == 0) {
     2163                   || strncmp(key, "ptcs", nkey) == 0) {
    21592164                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
    21602165        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
    2161                   || strncmp(key, "fpc", nkey) == 0) {
     2166                   || strncmp(key, "fpc", nkey) == 0) {
    21622167                uc->fixed_packet_count = config_bool_parse(value, nvalue);
    21632168        } else if (strncmp(key, "burst_size", nkey) == 0
Note: See TracChangeset for help on using the changeset viewer.