Changeset 0ec8a7c for tools


Ignore:
Timestamp:
02/27/15 17:31:03 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
98dc1ba
Parents:
7718e54
Message:

Change the user interface from a message and a packet to only messages.

Now a packet is a type of message (MESSAGE_PACKET).
I expanded the message structure into seperate arguments such that these
will be passed in registers on x64 systems. As such performance has
remained identical if not better.

I renamed libtrace_generic_types_t to libtrace_generic_t so it was shorter.

Location:
tools
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • tools/traceanon/traceanon_parallel.c

    rd994324 r0ec8a7c  
    154154
    155155
    156 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t)
    157 {
    158         if (pkt) {
    159                 struct libtrace_ip *ipptr;
    160                 libtrace_udp_t *udp = NULL;
    161                 libtrace_tcp_t *tcp = NULL;
    162 
    163                 ipptr = trace_get_ip(pkt);
     156static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
     157                        int mesg, libtrace_generic_t data,
     158                        libtrace_thread_t *sender UNUSED)
     159{
     160        struct libtrace_ip *ipptr;
     161        libtrace_udp_t *udp = NULL;
     162        libtrace_tcp_t *tcp = NULL;
     163
     164        switch (mesg) {
     165        case MESSAGE_PACKET:
     166                ipptr = trace_get_ip(data.pkt);
    164167
    165168                if (ipptr && (enc_source || enc_dest)) {
     
    173176                /* XXX replace with nice use of trace_get_transport() */
    174177
    175                 udp = trace_get_udp(pkt);
     178                udp = trace_get_udp(data.pkt);
    176179                if (udp && (enc_source || enc_dest)) {
    177180                        udp->check = 0;
    178181                }
    179182
    180                 tcp = trace_get_tcp(pkt);
     183                tcp = trace_get_tcp(data.pkt);
    181184                if (tcp && (enc_source || enc_dest)) {
    182185                        tcp->check = 0;
     
    191194                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
    192195
    193                 trace_publish_result(trace, t, trace_packet_get_order(pkt), (libtrace_generic_types_t){.pkt=pkt}, RESULT_PACKET);
    194                 //return ;
    195         }
    196         if (mesg) {
    197                 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    198                 switch (mesg->code) {
    199                         case MESSAGE_STARTING:
    200                                 enc_init(enc_type,key);
    201                         break;
    202                         case MESSAGE_TICK:
    203                                 trace_publish_result(trace, t, mesg->additional.uint64, (libtrace_generic_types_t){.pkt=NULL}, RESULT_TICK);
    204                 }
     196                trace_publish_result(trace, t, trace_packet_get_order(data.pkt), data, RESULT_PACKET);
     197                break;
     198        case MESSAGE_STARTING:
     199                enc_init(enc_type,key);
     200                break;
     201        case MESSAGE_TICK:
     202                trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK);
     203                break;
    205204        }
    206205        return NULL;
     
    423422         
    424423        int i = 1;
    425         trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_types_t){0});
     424        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
    426425        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
    427426
  • tools/tracertstats/tracertstats_parallel.c

    r2e22196d r0ec8a7c  
    171171} timestamp_sync_t;
    172172
    173 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
    174                                                 libtrace_message_t *mesg,
    175                                                 libtrace_thread_t *t)
     173static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
     174                        int mesg, libtrace_generic_t data,
     175                        libtrace_thread_t *sender UNUSED)
    176176{
    177177        int i;
    178178        static __thread uint64_t last_ts = 0, ts = 0;
    179179        static __thread result_t * results = NULL;
    180         // Unsure when we would hit this case but the old code had it, I
    181         // guess we should keep it
    182         if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) {
    183                 //fprintf(stderr, "Got packet t=%x\n", t);
    184                 ts = trace_get_seconds(pkt) / packet_interval;
     180
     181        switch(mesg) {
     182        case MESSAGE_PACKET:
     183                // Unsure when we would hit this case but the old code had it, I
     184                // guess we should keep it
     185                assert(trace_get_packet_buffer(pkt,NULL,NULL) != NULL);
     186                ts = trace_get_seconds(data.pkt) / packet_interval;
    185187                if (last_ts == 0)
    186188                        last_ts = ts;
    187189
    188190                while (packet_interval != UINT64_MAX && last_ts<ts) {
     191                        libtrace_generic_t tmp = {.ptr = results};
    189192                        // Publish and make a new one new
    190193                        //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
    191                         trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL);
     194                        trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL);
    192195                        trace_post_reporter(trace);
    193196                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    194197                        last_ts++;
    195198                }
    196                
     199
    197200                for(i=0;i<filter_count;++i) {
    198                         if(trace_apply_filter(filters[i].filter, pkt)) {
     201                        if(trace_apply_filter(filters[i].filter, data.pkt)) {
    199202                                results->filters[i].count++;
    200                                 results->filters[i].bytes+=trace_get_wire_length(pkt);
     203                                results->filters[i].bytes+=trace_get_wire_length(data.pkt);
    201204                        }
    202205                }
    203                
     206
    204207                results->total.count++;
    205                 results->total.bytes +=trace_get_wire_length(pkt);
     208                results->total.bytes +=trace_get_wire_length(data.pkt);
    206209                /*if (count >= packet_count) {
    207210                        report_results(ts,count,bytes);
     
    209212                        bytes=0;
    210213                }*/ // TODO what was happening here doesn't match up with any of the documentations!!!
    211         }
    212        
    213         if (mesg) {
    214                 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    215                 switch (mesg->code) {
    216                         case MESSAGE_STARTING:
    217                                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    218                                 break;
    219                         case MESSAGE_STOPPING:
    220                                 // Should we always post this?
    221                                 if (results->total.count) {
    222                                         trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL);
     214                return data.pkt;
     215
     216        case MESSAGE_STARTING:
     217                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     218                break;
     219
     220        case MESSAGE_STOPPING:
     221                // Should we always post this?
     222                if (results->total.count) {
     223                        libtrace_generic_t tmp = {.ptr = results};
     224                        trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL);
     225                        trace_post_reporter(trace);
     226                        results = NULL;
     227                }
     228                break;
     229
     230                case MESSAGE_TICK:
     231                {
     232                        int64_t offset;
     233                        struct timeval *tv, tv_real;
     234                        libtrace_packet_t *first_packet = NULL;
     235                        retrive_first_packet(trace, &first_packet, &tv);
     236                        if (first_packet != NULL) {
     237                                // So figure out our running offset
     238                                tv_real = trace_get_timeval(first_packet);
     239                                offset = tv_to_usec(tv) - tv_to_usec(&tv_real);
     240                                // Get time of day and do this stuff
     241                                uint64_t next_update_time;
     242                                next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset;
     243                                if (next_update_time <= data.uint64) {
     244                                        libtrace_generic_t tmp = {.ptr = results};
     245                                        //fprintf(stderr, "Got a tick and publishing early!!\n");
     246                                        trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL);
    223247                                        trace_post_reporter(trace);
    224                                         results = NULL;
     248                                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     249                                        last_ts++;
     250                                } else {
     251                                        //fprintf(stderr, "Got a tick but no publish ...\n");
    225252                                }
    226                                 break;
    227                         case MESSAGE_TICK:
    228                         {
    229                                 int64_t offset;
    230                                 struct timeval *tv, tv_real;
    231                                 libtrace_packet_t *first_packet = NULL;
    232                                 retrive_first_packet(trace, &first_packet, &tv);
    233                                 if (first_packet != NULL) {
    234                                         // So figure out our running offset
    235                                         tv_real = trace_get_timeval(first_packet);
    236                                         offset = tv_to_usec(tv) - tv_to_usec(&tv_real);
    237                                         // Get time of day and do this stuff
    238                                         uint64_t next_update_time;
    239                                         next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset;
    240                                         if (next_update_time <= mesg->additional.uint64) {
    241                                                 //fprintf(stderr, "Got a tick and publishing early!!\n");
    242                                                 trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL);
    243                                                 trace_post_reporter(trace);
    244                                                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    245                                                 last_ts++;
    246                                         } else {
    247                                                 //fprintf(stderr, "Got a tick but no publish ...\n");
    248                                         }
    249                                 } else {
    250                                         //fprintf(stderr, "Got a tick but no packets seen yet!!!\n");
    251                                 }
     253                        } else {
     254                                //fprintf(stderr, "Got a tick but no packets seen yet!!!\n");
    252255                        }
    253256                }
    254257        }
    255         return pkt;
     258        return NULL;
    256259}
    257260
  • tools/tracestats/tracestats_parallel.c

    r2adc1d0 r0ec8a7c  
    102102
    103103
    104 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
     104//libtrace_message_t mesg
     105static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
     106                        int mesg, libtrace_generic_t data,
     107                        libtrace_thread_t *sender)
    105108{
    106109        // Using first entry as total and those after for filter counts
    107110        static __thread statistics_t * results = NULL;
    108         int i;
    109        
    110         if (pkt) {
    111                 int wlen = trace_get_wire_length(pkt);
     111        int i, wlen;
     112        libtrace_stat_t *stats;
     113
     114
     115        // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
     116        switch (mesg) {
     117        case MESSAGE_PACKET:
     118                wlen = trace_get_wire_length(data.pkt);
    112119                for(i=0;i<filter_count;++i) {
    113120                        if (filters[i].filter == NULL)
    114121                                continue;
    115                         if(trace_apply_filter(filters[i].filter,pkt) > 0) {
     122                        if(trace_apply_filter(filters[i].filter,data.pkt) > 0) {
    116123                                results[i+1].count++;
    117124                                results[i+1].bytes+=wlen;
     
    126133                results[0].count++;
    127134                results[0].bytes +=wlen;
    128         }
    129         if (mesg) {
    130                 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    131                 switch (mesg->code) {
    132                         case MESSAGE_STOPPING:
    133                                 trace_publish_result(trace, t, 0, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL); // Only ever using a single key 0
    134                                 //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
    135                                 break;
    136                         case MESSAGE_STARTING:
    137                                 results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
    138                                 break;
    139                         case MESSAGE_DO_PAUSE:
    140                                 assert(!"GOT Asked to pause!!!\n");
    141                                 break;
    142                         case MESSAGE_PAUSING:
    143                                 //fprintf(stderr, "tracestats_parallel:\t pausing thread\n");
    144                                 break;
    145                         case MESSAGE_RESUMING:
    146                                 //fprintf(stderr, "tracestats_parallel:\t resuming thread\n");
    147                                 break;
    148                 }
    149         }
    150         return pkt;
     135                return data.pkt;
     136        case MESSAGE_STOPPING:
     137                stats = trace_create_statistics();
     138                trace_get_thread_statistics(trace, t, stats);
     139                trace_print_statistics(stats, stderr, NULL);
     140                free(stats);
     141                trace_publish_result(trace, t, 0, (libtrace_generic_t){.ptr = results}, RESULT_NORMAL); // Only ever using a single key 0
     142                //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
     143                break;
     144        case MESSAGE_STARTING:
     145                results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
     146                break;
     147        case MESSAGE_DO_PAUSE:
     148                assert(!"GOT Asked to pause!!!\n");
     149                break;
     150        case MESSAGE_PAUSING:
     151                //fprintf(stderr, "tracestats_parallel:\t pausing thread\n");
     152                break;
     153        case MESSAGE_RESUMING:
     154                //fprintf(stderr, "tracestats_parallel:\t resuming thread\n");
     155                break;
     156        }
     157        return NULL;
    151158}
    152159
Note: See TracChangeset for help on using the changeset viewer.