Changeset 9d5f464


Ignore:
Timestamp:
11/08/18 17:15:19 (2 years ago)
Author:
Jacob Van Walraven <jcv9@…>
Branches:
develop
Children:
6984c96
Parents:
93f4c64
Message:

Improvments to tickrate calculations

File:
1 edited

Legend:

Unmodified
Added
Removed
  • examples/tutorial/ipdist-parallel.c

    r93f4c64 r9d5f464  
    1515        uint64_t src[256];
    1616        uint64_t dst[256];
     17        uint64_t lastkey;
     18        uint64_t packets;
    1719};
    1820/* Structure to hold the result from a processing thread */
     
    2022        uint64_t src[256];
    2123        uint64_t dst[256];
     24        uint64_t packets;
    2225};
    2326/* Structure to hold counters the report has one of these, it combines
     
    2730        uint64_t dst[256];
    2831        uint64_t lastkey;
     32        uint64_t packets;
    2933};
    3034
     
    4044};
    4145
     46uint64_t tickrate;
     47
    4248/* Start callback function - This is run for each thread when it starts */
    4349static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
     
    5056                local->dst[i] = 0;
    5157        }
     58        local->lastkey = 0;
     59        local->packets = 0;
    5260
    5361        /* return the local storage so it is available for all other callbacks for the thread*/
     
    114122        struct addr_local *local = (struct addr_local *)tls;
    115123
     124        /* Store the timestamp of the last packet in erf format
     125         * We use the timestamp in the packet for processing non live traces */
     126        local->lastkey = trace_get_erf_timestamp(packet);
     127        /* Increment the packet count */
     128        local->packets += 1;
     129
    116130        /* Regain access to excluded networks pointer */
    117131        struct exclude_networks *exclude = (struct exclude_networks *)global;
     
    138152}
    139153
    140 /* Stopping callback function */
     154/* Stopping callback function - When a thread closes */
    141155static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
    142156
    143         /* create a structure to hold the result from the processing thread */
    144         struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
    145157        /* cast the local storage structure */
    146158        struct addr_local *local = (struct addr_local *)tls;
    147 
    148         /* Populate the result structure */
     159        /* Create structure to store the result */
     160        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
     161
     162        /* Populate the result */
    149163        int i;
    150164        for(i=0;i<256;i++) {
    151165                result->src[i] = local->src[i];
    152                 result->dst[i] = local->dst[i];
    153         }
    154 
    155         /* Publish the result to the reporter thread */
     166                result->src[i] = local->src[i];
     167        }
     168        result->packets = local->packets;
     169
     170        /* This will not cause the result to be printed but will atleast end up going into our tally
     171         * The reporter thread can then deal with it when it closes */
    156172        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
    157173
     
    159175        free(local);
    160176}
    161 
    162 
    163 
    164177
    165178/* Starting callback for reporter thread */
     
    175188        }
    176189        tally->lastkey = 0;
     190        tally->packets = 0;
    177191
    178192        return tally;
    179193}
    180194
    181 static void plot_results(struct addr_tally *tally) {
     195static void plot_results(struct addr_tally *tally, uint64_t tick) {
    182196
    183197        /* Get the current time */
    184198        time_t current_time = time(NULL);
    185         char* time_string = ctime(&current_time);
    186 
    187         /* Push all the data into a tmp file for gnuplot */
     199
    188200        char outputfile[255];
    189201        char outputplot[255];
    190         snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", current_time);
    191         snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", current_time);
    192         FILE *tmp = fopen(outputfile, "w");
     202        snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", tick);
     203        snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", tick);
     204
     205        /* Push all data into data file */
     206        FILE *tmp = fopen(outputfile, "w");
    193207        int i;
    194208        for(i=0;i<255;i++) {
     
    196210        }
    197211        fclose(tmp);
     212        printf("wrote out to file %s\n", outputfile);
    198213
    199214        /* Open pipe to gnuplot */
     
    204219        fprintf(gnuplot, "set xrange[0:255]\n");
    205220        fprintf(gnuplot, "set xlabel 'Prefix'\n");
    206         fprintf(gnuplot, "set xlabel 'Hits'\n");
     221        fprintf(gnuplot, "set ylabel 'Hits'\n");
    207222        fprintf(gnuplot, "set xtics 0,10,255\n");
    208223        fprintf(gnuplot, "set output '%s'\n", outputplot);
    209224        fprintf(gnuplot, "plot '%s' using 1:2 title 'Source address' with boxes, '%s' using 1:3 title 'Destination address' with boxes\n", outputfile, outputfile);
     225        fprintf(gnuplot, "replot");
    210226        pclose(gnuplot);
    211227}
     
    217233
    218234        struct addr_result *results;
    219         uint64_t key;
    220235        struct addr_tally *tally;
     236        uint64_t key;
    221237
    222238        /* We only want to handle results containing our user-defined structure  */
     
    233249        /* Grab our tally out of thread local storage */
    234250        tally = (struct addr_tally *)tls;
    235         /* increment tally with the new results */
    236         int i;
    237         for(i=0;i<256;i++) {
    238                 tally->src[i] += results->src[i];
    239                 tally->dst[i] += results->dst[i];
     251
     252        /* Add all the results to the tally */
     253        int i;
     254        for(i=0;i<256;i++) {
     255                tally->src[i] += results->src[i];
     256                tally->dst[i] += results->dst[i];
     257        }
     258        tally->packets += results->packets;
     259
     260        /* If the current timestamp is greater than the last printed plus the interval, output a result */
     261        if((key >> 32) >= (tally->lastkey >> 32) + (tickrate/1000)) {
     262
     263                /* update last key */
    240264                tally->lastkey = key;
    241         }
    242 
    243         /* Plot the result */
    244         plot_results(tally);
     265
     266                /* Plot the result with the key in epoch seconds*/
     267                plot_results(tally, key >> 32);
     268
     269                /* clear the tally */
     270                for(i=0;i<256;i++) {
     271                        tally->src[i] = 0;
     272                        tally->dst[i] = 0;
     273                }
     274                tally->packets = 0;
     275        }
    245276
    246277        /* Cleanup the thread results */
     
    254285        struct addr_tally *tally = (struct addr_tally *)tls;
    255286
    256         //OUTPUT DATA
    257         //plot_results(tally);
    258 
     287        /* If there is any remaining data in the tally plot it */
     288        if(tally->packets > 0) {
     289                plot_results(tally, (tally->lastkey >> 32) + tickrate);
     290        }
    259291        /* Cleanup tally results*/
    260292        free(tally);
     
    264296
    265297        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
     298        /* Proccessing thread local storage */
    266299        struct addr_local *local = (struct addr_local *)tls;
    267300
     
    276309                local->dst[i] = 0;
    277310        }
    278 
    279         printf("tick: %u\n", tick);
    280 
    281         /* Push the result to the combiner */
    282         trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
     311        result->packets = local->packets;
     312
     313        /* only use the tick timestamp if running against a live capture */
     314        uint64_t key;
     315        if(trace_get_information(trace)->live) {
     316                key = tick;
     317        } else {
     318                key = local->lastkey;
     319        }
     320
     321        /* Push result to the combiner */
     322        trace_publish_result(trace, thread, key, (libtrace_generic_t){.ptr=result}, RESULT_USER);
    283323}
    284324
     
    296336        }
    297337        /* Convert tick into correct format */
    298         uint64_t tickrate = atoi(argv[2]);
     338        tickrate = atoi(argv[2]);
    299339
    300340        /* Create the trace */
     
    324364        /* Try to balance the load across all processing threads */
    325365        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
     366
    326367        /* Set the tick interval */
    327368        trace_set_tick_interval(trace, tickrate);
    328 
     369        /* Do not buffer the reports */
     370        trace_set_reporter_thold(trace, 1);
    329371
    330372
Note: See TracChangeset for help on using the changeset viewer.