Changeset 29bbef0 for lib/trace.c


Ignore:
Timestamp:
03/30/14 17:48:26 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
f1015ad
Parents:
dad224b
Message:

My work from over summer, with a few things tidied up and updated to include recent commits/patches to bring this up to date. Still very much work in progress.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace.c

    rf7bcbfb r29bbef0  
    9898#include "format_helper.h"
    9999#include "rt_protocol.h"
     100
     101#include <pthread.h>
     102#include <signal.h>
    100103
    101104#define MAXOPTS 1024
     
    253256        libtrace->filtered_packets = 0;
    254257        libtrace->accepted_packets = 0;
     258       
     259        /* Parallel inits */
     260        // libtrace->libtrace_lock
     261        // libtrace->perpkt_cond;
     262        libtrace->perpkt_pausing = 0;
     263        libtrace->mapper_queue_full = false;
     264        libtrace->mappers_finishing = -1;
     265        libtrace->reducer_flags = 0;
     266        libtrace->joined = false;
     267        libtrace->global_blob = NULL;
     268        libtrace->per_pkt = NULL;
     269        libtrace->reducer = NULL;
     270        libtrace->hasher = NULL;
     271        libtrace->packet_freelist_size = 0;
     272        libtrace->mapper_buffer_size = 0;
     273        libtrace->expected_key = 0;
     274        libtrace_zero_ringbuffer(&libtrace->packet_freelist);
     275        libtrace_zero_thread(&libtrace->hasher_thread);
     276        libtrace_zero_thread(&libtrace->reducer_thread);
     277        libtrace_zero_slidingwindow(&libtrace->sliding_window);
     278        libtrace->reducer_thread.type = THREAD_EMPTY;
     279        libtrace->mapper_thread_count = 0;
     280        libtrace->mapper_threads = NULL;
    255281
    256282        /* Parse the URI to determine what sort of trace we are dealing with */
     
    348374        libtrace->io = NULL;
    349375        libtrace->filtered_packets = 0;
     376       
     377        /* Parallel inits */
     378        // libtrace->libtrace_lock
     379        // libtrace->perpkt_cond;
     380        libtrace->perpkt_pausing = 0;
     381        libtrace->mapper_queue_full = false;
     382        libtrace->mappers_finishing = -1;
     383        libtrace->reducer_flags = 0;
     384        libtrace->joined = false;
     385        libtrace->global_blob = NULL;
     386        libtrace->per_pkt = NULL;
     387        libtrace->reducer = NULL;
     388        libtrace->hasher = NULL;
     389        libtrace->expected_key = 0;
     390        libtrace->packet_freelist_size = 0;
     391        libtrace->mapper_buffer_size = 0;
     392        libtrace_zero_ringbuffer(&libtrace->packet_freelist);
     393        libtrace_zero_thread(&libtrace->hasher_thread);
     394        libtrace_zero_thread(&libtrace->reducer_thread);
     395        libtrace_zero_slidingwindow(&libtrace->sliding_window);
     396        libtrace->reducer_thread.type = THREAD_EMPTY;
     397        libtrace->mapper_thread_count = 0;
     398        libtrace->mapper_threads = NULL;
    350399       
    351400        for(tmp=formats_list;tmp;tmp=tmp->next) {
     
    583632 */
    584633DLLEXPORT void trace_destroy(libtrace_t *libtrace) {
     634    int i;
    585635        assert(libtrace);
    586636        if (libtrace->format) {
     
    590640                        libtrace->format->fin_input(libtrace);
    591641        }
    592         /* Need to free things! */
    593         if (libtrace->uridata)
     642        /* Need to free things! */
     643        if (libtrace->uridata)
    594644                free(libtrace->uridata);
     645       
     646        /* Empty any packet memory */
     647
     648        libtrace_packet_t * packet;
     649        while (libtrace_ringbuffer_try_read(&libtrace->packet_freelist,(void **) &packet))
     650                trace_destroy_packet(packet);
     651       
     652        libtrace_ringbuffer_destroy(&libtrace->packet_freelist);
     653       
     654        for (i = 0; i < libtrace->mapper_thread_count; ++i) {
     655                        assert (libtrace_vector_get_size(&libtrace->mapper_threads[i].vector) == 0);
     656                        libtrace_vector_destroy(&libtrace->mapper_threads[i].vector);
     657        }
     658        free(libtrace->mapper_threads);
     659        libtrace->mapper_threads = NULL;
     660        libtrace->mapper_thread_count = 0;
     661       
    595662        if (libtrace->event.packet) {
    596663                /* Don't use trace_destroy_packet here - there is almost
     
    661728        dest->type=packet->type;
    662729        dest->buf_control=TRACE_CTRL_PACKET;
     730        dest->order = packet->order;
    663731        /* Reset the cache - better to recalculate than try to convert
    664732         * the values over to the new packet */
     
    675743 */
    676744DLLEXPORT void trace_destroy_packet(libtrace_packet_t *packet) {
     745        /* Free any resources possibly associated with the packet */
     746        if (packet->trace && packet->trace->format->fin_packet) {
     747                packet->trace->format->fin_packet(packet);
     748        }
     749       
    677750        if (packet->buf_control == TRACE_CTRL_PACKET && packet->buffer) {
    678751                free(packet->buffer);
     
    685758}       
    686759
     760/**
     761 * Removes any possible data stored againt the trace and releases any data.
     762 * This will not destroy a reusable good malloc'd buffer (TRACE_CTRL_PACKET)
     763 * use trace_destroy_packet() for those diabolical purposes.
     764 */
     765void trace_fin_packet(libtrace_packet_t *packet);
     766void trace_fin_packet(libtrace_packet_t *packet) {
     767        if (packet)
     768        {
     769                if (packet->trace && packet->trace->format->fin_packet) {
     770                        packet->trace->format->fin_packet(packet);
     771                        //gettimeofday(&tv, NULL);
     772                        //printf ("%d.%06d DESTROYED #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
     773                }
     774
     775                // No matter what we remove the header and link pointers
     776                packet->trace = NULL;
     777                packet->header = NULL;
     778                packet->payload = NULL;
     779
     780                if (packet->buf_control != TRACE_CTRL_PACKET)
     781                {
     782                        //packet->buf_control = 0; // Invalid value this should be fixed
     783                        packet->buffer = NULL;
     784                }
     785
     786                packet->trace = NULL;
     787                packet->hash = 0;
     788                packet->order = 0;
     789                trace_clear_cache(packet);
     790        }
     791}
     792
    687793/* Read one packet from the trace into buffer. Note that this function will
    688794 * block until a packet is read (or EOF is reached).
     
    707813        }
    708814        assert(packet);
    709      
    710         /* Store the trace we are reading from into the packet opaque
    711          * structure */
    712         packet->trace = libtrace;
    713 
    714         /* Finalise the packet, freeing any resources the format module
    715          * may have allocated it
    716          */
    717         if (libtrace->format->fin_packet) {
    718                 libtrace->format->fin_packet(packet);
    719         }
    720 
    721815
    722816        if (libtrace->format->read_packet) {
    723817                do {
    724818                        size_t ret;
    725                         /* Clear the packet cache */
    726                         trace_clear_cache(packet);
     819                        /* Finalise the packet, freeing any resources the format module
     820                         * may have allocated it and zeroing all data associated with it.
     821                         */
     822                        trace_fin_packet(packet);
     823                        /* Store the trace we are reading from into the packet opaque
     824                         * structure */
     825                        packet->trace = libtrace;
    727826                        ret=libtrace->format->read_packet(libtrace,packet);
    728827                        if (ret==(size_t)-1 || ret==0) {
     
    743842                                                libtrace->snaplen);
    744843                        }
     844                        trace_packet_set_order(packet, libtrace->accepted_packets);
    745845                        ++libtrace->accepted_packets;
    746846                        return ret;
     
    9461046        }
    9471047
    948         return tv;
     1048    return tv;
    9491049}
    9501050
     
    11991299                libtrace_linktype_t linktype    ) {
    12001300#ifdef HAVE_BPF_FILTER
     1301        /* It just so happens that the underlying libs used by pthread arn't
     1302         * thread safe, namely lex/flex thingys, so single threaded compile
     1303         * multi threaded running should be safe.
     1304         */
     1305        static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    12011306        assert(filter);
    12021307
     
    12201325                                        "Unknown pcap equivalent linktype");
    12211326                        return -1;
     1327                }
     1328                assert (pthread_mutex_lock(&mutex) == 0);
     1329                /* Make sure not one bet us to this */
     1330                if (filter->flag) {
     1331                        printf("Someone bet us to compile the filter\n");
     1332                        assert (pthread_mutex_unlock(&mutex) == 0);
     1333                        return 1;
    12221334                }
    12231335                pcap=(pcap_t *)pcap_open_dead(
     
    12331345                                        pcap_geterr(pcap));
    12341346                        pcap_close(pcap);
     1347                        assert (pthread_mutex_unlock(&mutex) == 0);
    12351348                        return -1;
    12361349                }
    12371350                pcap_close(pcap);
    12381351                filter->flag=1;
     1352                assert (pthread_mutex_unlock(&mutex) == 0);
    12391353        }
    12401354        return 0;
     
    12561370        libtrace_linktype_t linktype;
    12571371        libtrace_packet_t *packet_copy = (libtrace_packet_t*)packet;
     1372#ifdef HAVE_LLVM
     1373        static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
     1374#endif
    12581375
    12591376        assert(filter);
     
    13061423         * what the link type was
    13071424         */
     1425        // Note internal mutex locking used here
    13081426        if (trace_bpf_compile(filter,packet_copy,linkptr,linktype)==-1) {
    13091427                if (free_packet_needed) {
     
    13161434#if HAVE_LLVM
    13171435        if (!filter->jitfilter) {
    1318                 filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len);
     1436                assert(pthread_mutex_lock(&mutex) == 0);
     1437                /* Again double check here like the bpf filter */
     1438                if(filter->jitfilter)
     1439                        printf("Someone bet us to compile the JIT thingy\n");
     1440                else
     1441                /* Looking at compile_program source this appears to be thread safe
     1442                 * however if this gets called twice we will leak this memory :(
     1443                 * as such lock here anyways */
     1444                        filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len);
     1445                assert(pthread_mutex_unlock(&mutex) == 0);
    13191446        }
    13201447#endif
Note: See TracChangeset for help on using the changeset viewer.