Changeset 0ec8a7c


Ignore:
Timestamp:
02/27/15 17:31:03 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
98dc1ba
Parents:
7718e54
Message:

Change the user interface from a message and a packet to only messages.

Now a packet is a type of message (MESSAGE_PACKET).
I expanded the message structure into seperate arguments such that these
will be passed in registers on x64 systems. As such performance has
remained identical if not better.

I renamed libtrace_generic_types_t to libtrace_generic_t so it was shorter.

Files:
11 edited

Legend:

Unmodified
Added
Removed
  • lib/libtrace.h.in

    r2adc1d0 r0ec8a7c  
    35073507        float rfloat;
    35083508        double rdouble;
    3509 } libtrace_generic_types_t;
     3509} libtrace_generic_t;
     3510ct_assert(sizeof(libtrace_generic_t) == 8);
    35103511
    35113512typedef struct libtrace_message_t {
    35123513        int code;
    3513         libtrace_generic_types_t additional;
     3514        libtrace_generic_t additional;
    35143515        libtrace_thread_t *sender;
    35153516} libtrace_message_t;
     
    35183519typedef struct libtrace_result_t {
    35193520        uint64_t key;
    3520         libtrace_generic_types_t value;
     3521        libtrace_generic_t value;
    35213522        int type;
    35223523} libtrace_result_t;
     
    35253526#define RESULT_TICK   2
    35263527
    3527 
    3528 typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
     3528/**
     3529 * The definition for the main function that the user supplies to process
     3530 * packets.
     3531 *
     3532 * @param trace The trace the packet is related to.
     3533 * @param thread The thread the trace is related to.
     3534 * @param mesg_code The type of data ready, the most important being MESSAGE_PACKET.
     3535 * In this case data.pkt contains the packet.
     3536 * @param data A generic union of types that fit into 8 bytes, containing
     3537 * information dependent upon the mesg_code.
     3538 * @param sender The thread that the message originated from.
     3539 *
     3540 * The values of data and sender depend upon the mesg_code. Please see the
     3541 * documentation for the message as to what value these will contain.
     3542 */
     3543typedef void* (*fn_per_pkt)(libtrace_t* trace,
     3544                            libtrace_thread_t *thread,
     3545                            int mesg_code,
     3546                            libtrace_generic_t data,
     3547                            libtrace_thread_t *sender);
    35293548typedef void (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m);
    35303549typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
     
    35373556DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
    35383557DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result);
    3539 DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value);
    3540 DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result);
    3541 DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value);
     3558DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_t value);
     3559DLLEXPORT libtrace_generic_t libtrace_result_get_value(libtrace_result_t * result);
     3560DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_t value);
    35423561DLLEXPORT void trace_destroy_result(libtrace_result_t ** result);
    35433562
     
    35493568
    35503569
    3551 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type);
     3570DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type);
    35523571typedef struct libtrace_vector libtrace_vector_t;
    35533572
     
    35993618
    36003619enum libtrace_messages {
     3620        MESSAGE_PACKET,
    36013621        MESSAGE_STARTING,
    36023622        MESSAGE_RESUMING,
     
    36133633        MESSAGE_POST_RANGE,
    36143634        MESSAGE_TICK,
    3615         MESSAGE_USER
     3635        MESSAGE_USER = 1000
    36163636};
    36173637
     
    38543874         * chosen.
    38553875         */
    3856         libtrace_generic_types_t configuration;
     3876        libtrace_generic_t configuration;
    38573877};
    38583878
    3859 DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config);
     3879DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config);
    38603880
    38613881#define READ_EOF 0
  • lib/trace_parallel.c

    r7718e54 r0ec8a7c  
    385385                                  libtrace_packet_t **packet,
    386386                                  bool tracetime) {
     387
    387388        if ((*packet)->error > 0) {
    388389                if (tracetime) {
     
    391392                }
    392393                t->accepted_packets++;
    393                 *packet = (*trace->per_pkt)(trace, *packet, NULL, t);
     394                libtrace_generic_t data = {.pkt = *packet};
     395                *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t);
    394396                trace_fin_packet(*packet);
    395397        } else {
    396                 libtrace_message_t message;
    397398                assert((*packet)->error == READ_TICK);
    398                 message.code = MESSAGE_TICK;
    399                 message.additional.uint64 = trace_packet_get_order(*packet);
    400                 message.sender = t;
    401                 (*trace->per_pkt)(trace, NULL, &message, t);
     399                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
     400                (*trace->per_pkt)(trace, t, MESSAGE_TICK, data, t);
    402401        }
    403402        return 0;
     
    462461                                     libtrace_packet_t *packets[],
    463462                                     int nb_packets, int *empty, int *offset) {
    464         libtrace_message_t message = {0};
    465463        libtrace_packet_t * packet = NULL;
    466464
    467465        /* Let the user thread know we are going to pause */
    468         message.code = MESSAGE_PAUSING;
    469         message.sender = t;
    470         (*trace->per_pkt)(trace, NULL, &message, t);
     466        (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
    471467
    472468        /* Send through any remaining packets (or messages) without delay */
     
    511507        /* Now we do the actual pause, this returns when we resumed */
    512508        trace_thread_pause(trace, t);
    513         message.code = MESSAGE_RESUMING;
    514         (*trace->per_pkt)(trace, NULL, &message, t);
     509        (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
    515510        return 1;
    516511}
     
    558553
    559554        /* Let the per_packet function know we have started */
    560         message.code = MESSAGE_STARTING;
    561         message.sender = t;
    562         (*trace->per_pkt)(trace, NULL, &message, t);
    563         message.code = MESSAGE_RESUMING;
    564         (*trace->per_pkt)(trace, NULL, &message, t);
    565 
     555        (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
     556        (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
    566557
    567558        for (;;) {
     
    586577                                        goto eof;
    587578                        }
    588                         (*trace->per_pkt)(trace, NULL, &message, t);
     579                        (*trace->per_pkt)(trace, t, message.code, message.additional, message.sender);
    589580                        /* Continue and the empty messages out before packets */
    590581                        continue;
     
    653644
    654645        // Let the per_packet function know we have stopped
    655         message.code = MESSAGE_PAUSING;
    656         message.sender = t;
    657         (*trace->per_pkt)(trace, NULL, &message, t);
    658         message.code = MESSAGE_STOPPING;
    659         message.additional.uint64 = 0;
    660         (*trace->per_pkt)(trace, NULL, &message, t);
     646        (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
     647        (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t);
    661648
    662649        // Free any remaining packets
     
    21012088        return result->key;
    21022089}
    2103 DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value) {
     2090DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_t value) {
    21042091        result->value = value;
    21052092}
    2106 DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result) {
     2093DLLEXPORT libtrace_generic_t libtrace_result_get_value(libtrace_result_t * result) {
    21072094        return result->value;
    21082095}
    2109 DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value) {
     2096DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_t value) {
    21102097        result->key = key;
    21112098        result->value = value;
     
    21552142 * Should only be called by a perpkt thread, i.e. from a perpkt handler
    21562143 */
    2157 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type) {
     2144DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
    21582145        libtrace_result_t res;
    21592146        res.type = type;
     
    21682155 * Sets a combiner function against the trace.
    21692156 */
    2170 DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config){
     2157DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
    21712158        if (combiner) {
    21722159                trace->combiner = *combiner;
  • test/test-format-parallel-hasher.c

    r43d3e73 r0ec8a7c  
    165165                        // All threads publish to verify the thread count
    166166                        assert(tls->count == 25 || tls->count == 75);
    167                         trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint=tls->count}, RESULT_NORMAL);
     167                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_NORMAL);
    168168                        trace_post_reporter(trace);
    169169                        free(tls);
  • test/test-format-parallel-reporter.c

    rd994324 r0ec8a7c  
    131131                }
    132132                x = c;
    133                 trace_publish_result(trace, t, trace_packet_get_order(pkt), (libtrace_generic_types_t){.pkt=pkt}, RESULT_PACKET);
     133                trace_publish_result(trace, t, trace_packet_get_order(pkt), (libtrace_generic_t){.pkt=pkt}, RESULT_PACKET);
    134134                return NULL;
    135135        }
     
    160160        if (strcmp(argv[1],"rtclient")==0) expected=101;
    161161
    162         trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_types_t){0});
     162        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
    163163
    164164        trace_pstart(trace, NULL, per_packet, reporter);
  • test/test-format-parallel-singlethreaded-hasher.c

    rd994324 r0ec8a7c  
    163163
    164164                        // All threads publish to verify the thread count
    165                         trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint=tls->count}, RESULT_NORMAL);
     165                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_NORMAL);
    166166                        trace_post_reporter(trace);
    167167                        free(tls);
  • test/test-format-parallel-singlethreaded.c

    rd994324 r0ec8a7c  
    162162
    163163                        // All threads publish to verify the thread count
    164                         trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint=tls->count}, RESULT_NORMAL);
     164                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_NORMAL);
    165165                        trace_post_reporter(trace);
    166166                        free(tls);
  • test/test-format-parallel-stressthreads.c

    rd994324 r0ec8a7c  
    162162
    163163                        // All threads publish to verify the thread count
    164                         trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint=tls->count}, RESULT_NORMAL);
     164                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_NORMAL);
    165165                        trace_post_reporter(trace);
    166166                        free(tls);
  • test/test-format-parallel.c

    rd994324 r0ec8a7c  
    172172
    173173                        // All threads publish to verify the thread count
    174                         trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint = count}, RESULT_NORMAL);
     174                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = count}, RESULT_NORMAL);
    175175                        trace_post_reporter(trace);
    176176                        break;
  • tools/traceanon/traceanon_parallel.c

    rd994324 r0ec8a7c  
    154154
    155155
    156 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t)
    157 {
    158         if (pkt) {
    159                 struct libtrace_ip *ipptr;
    160                 libtrace_udp_t *udp = NULL;
    161                 libtrace_tcp_t *tcp = NULL;
    162 
    163                 ipptr = trace_get_ip(pkt);
     156static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
     157                        int mesg, libtrace_generic_t data,
     158                        libtrace_thread_t *sender UNUSED)
     159{
     160        struct libtrace_ip *ipptr;
     161        libtrace_udp_t *udp = NULL;
     162        libtrace_tcp_t *tcp = NULL;
     163
     164        switch (mesg) {
     165        case MESSAGE_PACKET:
     166                ipptr = trace_get_ip(data.pkt);
    164167
    165168                if (ipptr && (enc_source || enc_dest)) {
     
    173176                /* XXX replace with nice use of trace_get_transport() */
    174177
    175                 udp = trace_get_udp(pkt);
     178                udp = trace_get_udp(data.pkt);
    176179                if (udp && (enc_source || enc_dest)) {
    177180                        udp->check = 0;
    178181                }
    179182
    180                 tcp = trace_get_tcp(pkt);
     183                tcp = trace_get_tcp(data.pkt);
    181184                if (tcp && (enc_source || enc_dest)) {
    182185                        tcp->check = 0;
     
    191194                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
    192195
    193                 trace_publish_result(trace, t, trace_packet_get_order(pkt), (libtrace_generic_types_t){.pkt=pkt}, RESULT_PACKET);
    194                 //return ;
    195         }
    196         if (mesg) {
    197                 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    198                 switch (mesg->code) {
    199                         case MESSAGE_STARTING:
    200                                 enc_init(enc_type,key);
    201                         break;
    202                         case MESSAGE_TICK:
    203                                 trace_publish_result(trace, t, mesg->additional.uint64, (libtrace_generic_types_t){.pkt=NULL}, RESULT_TICK);
    204                 }
     196                trace_publish_result(trace, t, trace_packet_get_order(data.pkt), data, RESULT_PACKET);
     197                break;
     198        case MESSAGE_STARTING:
     199                enc_init(enc_type,key);
     200                break;
     201        case MESSAGE_TICK:
     202                trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK);
     203                break;
    205204        }
    206205        return NULL;
     
    423422         
    424423        int i = 1;
    425         trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_types_t){0});
     424        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
    426425        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
    427426
  • tools/tracertstats/tracertstats_parallel.c

    r2e22196d r0ec8a7c  
    171171} timestamp_sync_t;
    172172
    173 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
    174                                                 libtrace_message_t *mesg,
    175                                                 libtrace_thread_t *t)
     173static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
     174                        int mesg, libtrace_generic_t data,
     175                        libtrace_thread_t *sender UNUSED)
    176176{
    177177        int i;
    178178        static __thread uint64_t last_ts = 0, ts = 0;
    179179        static __thread result_t * results = NULL;
    180         // Unsure when we would hit this case but the old code had it, I
    181         // guess we should keep it
    182         if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) {
    183                 //fprintf(stderr, "Got packet t=%x\n", t);
    184                 ts = trace_get_seconds(pkt) / packet_interval;
     180
     181        switch(mesg) {
     182        case MESSAGE_PACKET:
     183                // Unsure when we would hit this case but the old code had it, I
     184                // guess we should keep it
     185                assert(trace_get_packet_buffer(pkt,NULL,NULL) != NULL);
     186                ts = trace_get_seconds(data.pkt) / packet_interval;
    185187                if (last_ts == 0)
    186188                        last_ts = ts;
    187189
    188190                while (packet_interval != UINT64_MAX && last_ts<ts) {
     191                        libtrace_generic_t tmp = {.ptr = results};
    189192                        // Publish and make a new one new
    190193                        //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
    191                         trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL);
     194                        trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL);
    192195                        trace_post_reporter(trace);
    193196                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    194197                        last_ts++;
    195198                }
    196                
     199
    197200                for(i=0;i<filter_count;++i) {
    198                         if(trace_apply_filter(filters[i].filter, pkt)) {
     201                        if(trace_apply_filter(filters[i].filter, data.pkt)) {
    199202                                results->filters[i].count++;
    200                                 results->filters[i].bytes+=trace_get_wire_length(pkt);
     203                                results->filters[i].bytes+=trace_get_wire_length(data.pkt);
    201204                        }
    202205                }
    203                
     206
    204207                results->total.count++;
    205                 results->total.bytes +=trace_get_wire_length(pkt);
     208                results->total.bytes +=trace_get_wire_length(data.pkt);
    206209                /*if (count >= packet_count) {
    207210                        report_results(ts,count,bytes);
     
    209212                        bytes=0;
    210213                }*/ // TODO what was happening here doesn't match up with any of the documentations!!!
    211         }
    212        
    213         if (mesg) {
    214                 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    215                 switch (mesg->code) {
    216                         case MESSAGE_STARTING:
    217                                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    218                                 break;
    219                         case MESSAGE_STOPPING:
    220                                 // Should we always post this?
    221                                 if (results->total.count) {
    222                                         trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL);
     214                return data.pkt;
     215
     216        case MESSAGE_STARTING:
     217                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     218                break;
     219
     220        case MESSAGE_STOPPING:
     221                // Should we always post this?
     222                if (results->total.count) {
     223                        libtrace_generic_t tmp = {.ptr = results};
     224                        trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL);
     225                        trace_post_reporter(trace);
     226                        results = NULL;
     227                }
     228                break;
     229
     230                case MESSAGE_TICK:
     231                {
     232                        int64_t offset;
     233                        struct timeval *tv, tv_real;
     234                        libtrace_packet_t *first_packet = NULL;
     235                        retrive_first_packet(trace, &first_packet, &tv);
     236                        if (first_packet != NULL) {
     237                                // So figure out our running offset
     238                                tv_real = trace_get_timeval(first_packet);
     239                                offset = tv_to_usec(tv) - tv_to_usec(&tv_real);
     240                                // Get time of day and do this stuff
     241                                uint64_t next_update_time;
     242                                next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset;
     243                                if (next_update_time <= data.uint64) {
     244                                        libtrace_generic_t tmp = {.ptr = results};
     245                                        //fprintf(stderr, "Got a tick and publishing early!!\n");
     246                                        trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL);
    223247                                        trace_post_reporter(trace);
    224                                         results = NULL;
     248                                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     249                                        last_ts++;
     250                                } else {
     251                                        //fprintf(stderr, "Got a tick but no publish ...\n");
    225252                                }
    226                                 break;
    227                         case MESSAGE_TICK:
    228                         {
    229                                 int64_t offset;
    230                                 struct timeval *tv, tv_real;
    231                                 libtrace_packet_t *first_packet = NULL;
    232                                 retrive_first_packet(trace, &first_packet, &tv);
    233                                 if (first_packet != NULL) {
    234                                         // So figure out our running offset
    235                                         tv_real = trace_get_timeval(first_packet);
    236                                         offset = tv_to_usec(tv) - tv_to_usec(&tv_real);
    237                                         // Get time of day and do this stuff
    238                                         uint64_t next_update_time;
    239                                         next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset;
    240                                         if (next_update_time <= mesg->additional.uint64) {
    241                                                 //fprintf(stderr, "Got a tick and publishing early!!\n");
    242                                                 trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL);
    243                                                 trace_post_reporter(trace);
    244                                                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    245                                                 last_ts++;
    246                                         } else {
    247                                                 //fprintf(stderr, "Got a tick but no publish ...\n");
    248                                         }
    249                                 } else {
    250                                         //fprintf(stderr, "Got a tick but no packets seen yet!!!\n");
    251                                 }
     253                        } else {
     254                                //fprintf(stderr, "Got a tick but no packets seen yet!!!\n");
    252255                        }
    253256                }
    254257        }
    255         return pkt;
     258        return NULL;
    256259}
    257260
  • tools/tracestats/tracestats_parallel.c

    r2adc1d0 r0ec8a7c  
    102102
    103103
    104 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
     104//libtrace_message_t mesg
     105static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
     106                        int mesg, libtrace_generic_t data,
     107                        libtrace_thread_t *sender)
    105108{
    106109        // Using first entry as total and those after for filter counts
    107110        static __thread statistics_t * results = NULL;
    108         int i;
    109        
    110         if (pkt) {
    111                 int wlen = trace_get_wire_length(pkt);
     111        int i, wlen;
     112        libtrace_stat_t *stats;
     113
     114
     115        // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
     116        switch (mesg) {
     117        case MESSAGE_PACKET:
     118                wlen = trace_get_wire_length(data.pkt);
    112119                for(i=0;i<filter_count;++i) {
    113120                        if (filters[i].filter == NULL)
    114121                                continue;
    115                         if(trace_apply_filter(filters[i].filter,pkt) > 0) {
     122                        if(trace_apply_filter(filters[i].filter,data.pkt) > 0) {
    116123                                results[i+1].count++;
    117124                                results[i+1].bytes+=wlen;
     
    126133                results[0].count++;
    127134                results[0].bytes +=wlen;
    128         }
    129         if (mesg) {
    130                 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    131                 switch (mesg->code) {
    132                         case MESSAGE_STOPPING:
    133                                 trace_publish_result(trace, t, 0, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL); // Only ever using a single key 0
    134                                 //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
    135                                 break;
    136                         case MESSAGE_STARTING:
    137                                 results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
    138                                 break;
    139                         case MESSAGE_DO_PAUSE:
    140                                 assert(!"GOT Asked to pause!!!\n");
    141                                 break;
    142                         case MESSAGE_PAUSING:
    143                                 //fprintf(stderr, "tracestats_parallel:\t pausing thread\n");
    144                                 break;
    145                         case MESSAGE_RESUMING:
    146                                 //fprintf(stderr, "tracestats_parallel:\t resuming thread\n");
    147                                 break;
    148                 }
    149         }
    150         return pkt;
     135                return data.pkt;
     136        case MESSAGE_STOPPING:
     137                stats = trace_create_statistics();
     138                trace_get_thread_statistics(trace, t, stats);
     139                trace_print_statistics(stats, stderr, NULL);
     140                free(stats);
     141                trace_publish_result(trace, t, 0, (libtrace_generic_t){.ptr = results}, RESULT_NORMAL); // Only ever using a single key 0
     142                //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
     143                break;
     144        case MESSAGE_STARTING:
     145                results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
     146                break;
     147        case MESSAGE_DO_PAUSE:
     148                assert(!"GOT Asked to pause!!!\n");
     149                break;
     150        case MESSAGE_PAUSING:
     151                //fprintf(stderr, "tracestats_parallel:\t pausing thread\n");
     152                break;
     153        case MESSAGE_RESUMING:
     154                //fprintf(stderr, "tracestats_parallel:\t resuming thread\n");
     155                break;
     156        }
     157        return NULL;
    151158}
    152159
Note: See TracChangeset for help on using the changeset viewer.