Changeset ccb89e4 for examples


Ignore:
Timestamp:
09/15/15 15:48:17 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
624c2da
Parents:
a31e166
Message:

Update trivial and timedemo examples to use new parallel API

Also fixed a few documentation errors (spelling, grammar etc).

Location:
examples/parallel
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • examples/parallel/timedemo.c

    r7c95027 rccb89e4  
    1 /* A parallel libtrace program that prints a count of packets observered
    2  * after a 10 seconds of the trace running.
     1/* A parallel libtrace program that prints a count of packets observed
     2 * every 10 seconds.
    33 *
    4  * Using this approach allows results to be reported quickly for trracetime
    5  * formats, even if data is not arriving on a given thread. While maintaining
    6  * a consistant output when run on a file etc.
     4 * Using this approach allows results to be reported promptly for live
     5 * formats, even if data is not arriving on a given thread. This method also
     6 * works perfectly fine when run against a trace file.
    77 *
    8  * Designed to demonstrate the correct usage of TICK_INTERVAL. Also note
    9  * TICK_COUNT is not needed for this example.
     8 * Designed to demonstrate the correct usage of TICK_INTERVAL. TICK_COUNT can
     9 * be used instead, which will trigger the result reporting based on seeing
     10 * a fixed number of packets.
    1011 *
    1112 * This example is based upon examples/tutorial/timedemo.c
     
    1617#include <assert.h>
    1718#include <getopt.h>
     19#include <stdlib.h>
    1820
    1921#define SECONDS_TO_ERF(sec) (((uint64_t)sec)<<32)
     
    2224#define TV_TO_ERF(tv) ((((uint64_t)(tv).tv_sec) << 32) + ((((uint64_t)(tv).tv_usec)<< 32)/1000000))
    2325
     26struct localdata {
     27        uint64_t nextreport;
     28        uint64_t count;
     29};
     30
    2431/* Due to the amount of error checking required in our main function, it
    2532 * is a lot simpler and tidier to place all the calls to various libtrace
    2633 * destroy functions into a separate function.
    2734 */
    28 static void libtrace_cleanup(libtrace_t *trace) {
     35static void libtrace_cleanup(libtrace_t *trace,
     36                libtrace_callback_set_t *processing,
     37                libtrace_callback_set_t *reporter) {
    2938
    3039        /* It's very important to ensure that we aren't trying to destroy
     
    3443                trace_destroy(trace);
    3544
    36 }
    37 
    38 /* Every time a packet becomes ready this function will be called. It will also
    39  * be called when messages from the library is received. This function
    40  * is run in parallel.
    41  */
    42 static void* per_packet(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
    43                         int mesg, libtrace_generic_t data,
    44                         libtrace_thread_t *sender UNUSED)
    45 {
    46         /* __thread, says make this unique per each thread */
    47         static __thread uint64_t count = 0; /* The number of packets in this 10sec interval */
    48         static __thread uint64_t next_report = 0; /* The start of the next interval */
    49         static __thread uint64_t offset = 0; /* Offset between trace time and system time */
    50         uint64_t ts; /* The timestamp of the current packet */
    51 
    52         switch (mesg) {
    53         case MESSAGE_PACKET:
    54                 /* Get the timestamp for the current packet */
    55                 ts = trace_get_erf_timestamp(data.pkt);
    56 
    57                 /* Check whether we need to report a packet count or not.
    58                  *
    59                  * If the timestamp for the current packet is beyond the time when the
    60                  * next report was due then we have to output our current count and
    61                  * reset it to zero.
    62                  *
    63                  * Note that I use a while loop here to ensure that we correctly deal
    64                  * with periods in which no packets are observed. This can still
    65                  * happen because TICK_INTERVAL is not used for realtime playback
    66                  * such as a file.
    67                  */
    68                 while (next_report && ts > next_report) {
    69                         libtrace_generic_t c;
    70                         c.uint64 = count;
    71                         /* Report the result for the current time interval
    72                          * Each thread will report once for each given time
    73                          * interval */
    74                         trace_publish_result(trace, t, next_report, c, RESULT_USER);
    75 
    76                         /* Reset the counter */
    77                         count = 0;
    78                         /* Determine when the next report is due */
    79                         next_report += SECONDS_TO_ERF(10);
    80                 }
    81 
    82                 /* No matter what else happens during this function call, we still
    83                  * need to increment our counter */
    84                 count += 1;
    85 
    86                 /* We have finished processing this packet return it */
    87                 return data.pkt;
    88         case MESSAGE_TICK_INTERVAL:
    89 
    90                  /* If we are a second passed when we should have reported last
    91                   * we will do it now. We would be in this situation if we
    92                   * haven't been receiving packets.
    93                   * Make sure we dont report until we have seen the first packet
    94                   */
    95                 while (next_report &&
    96                        (data.uint64 - offset - SECONDS_TO_ERF(1) > next_report)) {
    97                         libtrace_generic_t c;
    98                         c.uint64 = count;
    99                         /* Report the result for the current time interval */
    100                         trace_publish_result(trace, t, next_report, c, RESULT_USER);
    101 
    102                         /* Reset the counter */
    103                         count = 0;
    104                         /* Determine when the next report is due */
    105                         next_report += SECONDS_TO_ERF(10);
    106                 }
    107 
    108         /* !!! Fall through to check if we have the first packet yet !!! */
    109         case MESSAGE_FIRST_PACKET: /* Some thread has seen its first packet */
    110 
    111                 if (next_report == 0) {
    112                         uint64_t first_ts;
    113                         /* Try get the timestamp of the first packet across all threads*/
    114                         const libtrace_packet_t * tmp = NULL;
    115                         const struct timeval *tv;
    116 
    117                         /* Get the first packet across all threads */
    118                         if (trace_get_first_packet(trace, NULL, &tmp, &tv) == 1) {
    119                                 /* We know this is the first packet across all threads */
    120 
    121                                 first_ts = trace_get_erf_timestamp(tmp);
    122                                 /* There might be a difference between system time
    123                                  * and packet times. We need to account for this
    124                                  * when interpreting TICK_INTERVAL messages */
    125                                 offset = TV_TO_ERF(*tv) - first_ts;
    126                                 /* We know our first reporting time now */
    127                                 next_report = first_ts + SECONDS_TO_ERF(10);
    128                         }
    129                 }
    130                 return NULL;
    131         default:
    132                 return NULL;
    133         }
    134         return NULL;
    135 }
    136 
    137 /* Every time a result (published using trace_publish_result()) becomes ready
    138  * this function will be called. It will also be called when messages from the
    139  * library is received. This function is only run on a single thread
    140  */
    141 static void report_results(libtrace_t *trace UNUSED, int mesg,
    142                            libtrace_generic_t data,
    143                            libtrace_thread_t *sender UNUSED) {
    144         static uint64_t count = 0; /* The count for the current interval */
    145         static int reported = 0; /* The number of threads that have reported results for the interval */
    146         static uint64_t currentkey = 0; /* The key, which is next_report from perpkt */
    147 
    148         switch (mesg) {
    149         case MESSAGE_RESULT:
    150                 if (data.res->type == RESULT_USER) {
    151                         /* We should always get a result from each thread */
    152                         if (currentkey)
    153                                 assert(data.res->key == currentkey);
    154 
    155                         currentkey = data.res->key;
    156                         reported++;
    157                         /* Add on the packets */
    158                         count += data.res->value.uint64;
    159 
    160                         if (reported == libtrace_get_perpkt_count(trace)) {
    161                                 /* Print a timestamp for the report and the packet count */
    162                                 printf("%u \t%" PRIu64 "\n", (int) ERF_TO_SECONDS(data.res->key), count);
    163                                 /* Reset ready for the next batch of results */
    164                                 count = reported = 0;
    165                                 currentkey = data.res->key + SECONDS_TO_ERF(10);
    166                         }
    167                 }
    168                 break;
    169         case MESSAGE_STARTING:
    170                 /* Print heading when first started */
    171                 printf("Time\t\tPackets\n");
    172                 break;
    173         }
     45        if (processing)
     46                trace_destroy_callback_set(processing);
     47
     48        if (reporter)
     49                trace_destroy_callback_set(reporter);
     50
     51}
     52
     53/* Creates a localdata structure for a processing thread */
     54static void *init_local(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     55                void *global UNUSED) {
     56
     57        struct localdata *local = (struct localdata *)malloc(sizeof(struct
     58                        localdata));
     59        local->nextreport = 0;
     60        local->count = 0;
     61
     62        return local;
     63
     64}
     65
     66/* Frees the localdata associated with a processing thread */
     67static void fin_local(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     68                void *global UNUSED, void *tls) {
     69
     70        free(tls);
     71}
     72
     73static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *t,
     74                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
     75
     76        uint64_t ts;
     77        /* Cast our thread local storage to the right type */
     78        struct localdata *local = (struct localdata *)tls;
     79
     80        /* Get the timestamp for the current packet */
     81        ts = trace_get_erf_timestamp(packet);
     82
     83        /* Check whether we need to report a packet count or not.
     84         *
     85         * If the timestamp for the current packet is beyond the time when the
     86         * next report was due then we have to output our current count and
     87         * reset it to zero.
     88         *
     89         * Note that I use a while loop here to ensure that we correctly deal
     90         * with periods in which no packets are observed. This can still
     91         * happen because TICK_INTERVAL is not used for realtime playback
     92         * such as a file.
     93         */
     94        while (local->nextreport && ts > local->nextreport) {
     95                libtrace_generic_t c;
     96                c.uint64 = local->count;
     97                /* Report the result for the current time interval.
     98                 * Each thread will report once for each given time
     99                 * interval */
     100                trace_publish_result(trace, t, local->nextreport, c,
     101                                RESULT_USER);
     102
     103                /* Reset the counter */
     104                local->count = 0;
     105                /* Determine when the next report is due */
     106                local->nextreport += SECONDS_TO_ERF(10);
     107        }
     108
     109        /* No matter what else happens during this function call, we still
     110         * need to increment our counter */
     111        local->count += 1;
     112
     113        /* We have finished processing this packet so return it */
     114        return packet;
     115
     116}
     117
     118/* As soon as any thread has seen a packet, we need to initialise the
     119 * next reporting time for each of our processing threads */
     120static void first_packet(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     121                void *global UNUSED, void *tls,
     122                libtrace_thread_t *sender UNUSED) {
     123
     124        /* Cast our thread local storage to the right type */
     125        struct localdata *local = (struct localdata *)tls;
     126
     127        if (local->nextreport == 0) {
     128                uint64_t first_ts;
     129                /* Get the timestamp of the first packet across all threads */
     130                const libtrace_packet_t * tmp = NULL;
     131                const struct timeval *tv;
     132
     133                /* Get the first packet across all threads */
     134                if (trace_get_first_packet(trace, NULL, &tmp, &tv) == 1) {
     135                        first_ts = trace_get_erf_timestamp(tmp);
     136                        /* We know our first reporting time now */
     137                        local->nextreport = first_ts + SECONDS_TO_ERF(10);
     138                }
     139        }
     140}
     141
     142static void process_tick(libtrace_t *trace, libtrace_thread_t *t,
     143                void *global UNUSED, void *tls, uint64_t tick) {
     144
     145        struct localdata *local = (struct localdata *)tls;
     146
     147        while (local->nextreport && tick > local->nextreport) {
     148                libtrace_generic_t c;
     149                c.uint64 = local->count;
     150                /* If the tick is past the time that our next report is
     151                 * due, flush our current counter to the reporting
     152                 * thread. This ensures that we keep sending results even
     153                 * if this thread receives no new packets
     154                 */
     155                trace_publish_result(trace, t, local->nextreport, c,
     156                        RESULT_USER);
     157
     158                /* Reset the counter */
     159                local->count = 0;
     160                /* Determine when the next report is due */
     161                local->nextreport += SECONDS_TO_ERF(10);
     162        }
     163
     164}
     165
     166static inline void dump_results(struct localdata *local, uint64_t key) {
     167
     168        /* Using a while loop here, so that we can correctly handle any
     169         * 10 second intervals where no packets were counted.
     170         */
     171        while (key >= local->nextreport) {
     172                printf("%u \t%" PRIu64 "\n",
     173                                (int) ERF_TO_SECONDS(local->nextreport),
     174                                local->count);
     175                local->count = 0;
     176                local->nextreport += SECONDS_TO_ERF(10);
     177        }
     178}
     179
     180/* Process results sent to the reporter thread */
     181static void report_results(libtrace_t *trace,
     182                libtrace_thread_t *sender UNUSED,
     183                void *global UNUSED, void *tls, libtrace_result_t *result) {
     184
     185        static __thread int reported = 0;
     186        struct localdata *local = (struct localdata *)tls;
     187
     188
     189        /* Set the initial reporting time and print the heading
     190         * Note: we could do these in starting and first_packet callbacks
     191         * but there is only one reporting thread so we can get away
     192         * with this. */
     193        if (local->nextreport == 0) {
     194                printf("Time\t\tPackets\n");
     195                local->nextreport = result->key;
     196        }
     197        assert(result->key == local->nextreport);
     198
     199        reported ++;
     200        if (reported == trace_get_perpkt_threads(trace)) {
     201                dump_results(local, result->key);
     202                reported = 0;
     203        }
     204
     205        local->count += result->value.uint64;
     206
     207}
     208
     209/* Dump the final value for the counter and free up our local data struct */
     210static void end_reporter(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     211                void *global UNUSED, void *tls) {
     212
     213        struct localdata *local = (struct localdata *)tls;
     214
     215        /* If we have any counted packets that haven't been reported, do
     216         * so now.
     217         */
     218        if (local->count > 0)
     219                dump_results(local, local->nextreport + 1);
     220
     221        free(local);
    174222}
    175223
     
    177225{
    178226        libtrace_t *trace = NULL;
     227        libtrace_callback_set_t *processing = NULL;
     228        libtrace_callback_set_t *reporter = NULL;
    179229
    180230        /* Ensure we have at least one argument after the program name */
     
    188238        if (trace_is_err(trace)) {
    189239                trace_perror(trace,"Opening trace file");
    190                 libtrace_cleanup(trace);
    191                 return 1;
    192         }
    193 
    194         /* We want to push through results ASAP */
     240                libtrace_cleanup(trace, processing, reporter);
     241                return 1;
     242        }
     243
     244        /* Send every result to the reporter immediately, i.e. do not buffer
     245         * them. */
    195246        trace_set_reporter_thold(trace, 1);
    196247
    197         /* If the trace is live send a tick message every second */
     248        /* Sends a tick message once per second */
    198249        trace_set_tick_interval(trace, 1000);
    199250
     
    204255         * in order so we use combiner_ordered.
    205256         *
    206          * This typically the most usefull combiner to use.
     257         * This is typically the most useful combiner to use.
    207258         */
    208259        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
    209260
    210         if (trace_pstart(trace, NULL, per_packet, report_results) == -1) {
     261        /* Limit to 4 processing threads */
     262        trace_set_perpkt_threads(trace, 4);
     263
     264        /* Set up our processing callbacks */
     265        processing = trace_create_callback_set();
     266        trace_set_starting_cb(processing, init_local);
     267        trace_set_first_packet_cb(processing, first_packet);
     268        trace_set_stopping_cb(processing, fin_local);
     269        trace_set_packet_cb(processing, per_packet);
     270        trace_set_tick_interval_cb(processing, process_tick);
     271
     272        /* Set up our reporting callbacks -- note that we re-use the init_local
     273         * callback */
     274        reporter = trace_create_callback_set();
     275        trace_set_starting_cb(reporter, init_local);
     276        trace_set_result_cb(reporter, report_results);
     277        trace_set_stopping_cb(reporter, end_reporter);
     278
     279        /* Start everything going -- no global data required so set that
     280         * to NULL */
     281        if (trace_pstart(trace, NULL, processing, reporter) == -1) {
    211282                trace_perror(trace,"Starting trace");
    212                 libtrace_cleanup(trace);
     283                libtrace_cleanup(trace, processing, reporter);
    213284                return 1;
    214285        }
     
    218289        if (trace_is_err(trace)) {
    219290                trace_perror(trace,"Reading packets");
    220                 libtrace_cleanup(trace);
    221                 return 1;
    222         }
    223 
    224         libtrace_cleanup(trace);
     291                libtrace_cleanup(trace, processing, reporter);
     292                return 1;
     293        }
     294
     295        libtrace_cleanup(trace, processing, reporter);
    225296        return 0;
    226297}
  • examples/parallel/trivial_skeleton.c

    r16cb2a2 rccb89e4  
    1111#include <assert.h>
    1212
    13 static void process_packet(libtrace_packet_t *packet)
    14 {
    15         /* You really should consider using complete_parallel.c instead */
    16         assert(packet);
    1713
    18         /* Your code goes here */
     14/* Every time a packet becomes ready this function will be called. This
     15 * function is run in parallel, so multiple packets can be processed at once.
     16 *
     17 * Parameters:
     18 *   trace -- the input source that the packet was read from
     19 *   t -- a pointer to the current processing thread
     20 *   global -- a pointer to the global variable passed in to trace_start
     21 *   tls -- a pointer to the thread local storage for this thread
     22 *   packet -- the packet itself
     23 */
     24static libtrace_packet_t *process_packet(libtrace_t *trace,
     25                libtrace_thread_t *t,
     26                void *global, void *tls, libtrace_packet_t *packet) {
    1927
     28        /* Note that in this example, global and tls will both be NULL.
     29         * global is NULL because we passed NULL as the second argument
     30         * for trace_pstart. tls is NULL because we did not set a
     31         * starting callback for our per packet threads.
     32         */
     33
     34
     35        assert(packet);
     36
     37        /* Your code goes here */
     38
     39        /* If we've finished with the packet, we should return it to
     40         * libtrace so that it can be reused. */
     41        return packet;
    2042}
    21 
    22 /* Every time a packet becomes ready this function will be called. It will also
    23  * be called when messages from the library are received. This function
    24  * is run in parallel.
    25  */
    26 static void* per_packet(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
    27                         int mesg, libtrace_generic_t data,
    28                         libtrace_thread_t *sender UNUSED)
    29 {
    30 
    31         switch (mesg) {
    32         case MESSAGE_PACKET:
    33                 process_packet(data.pkt);
    34                 /* If we have finished processing this packet return it */
    35                 return data.pkt;
    36         default:
    37                 return NULL;
    38         }
    39         return NULL;
    40 }
    41 
    4243
    4344int main(int argc, char *argv[])
    4445{
    4546        libtrace_t *trace;
     47        libtrace_callback_set_t *pktcbs;
    4648
    4749        if (argc<2) {
     
    5759        }
    5860
     61        /* Create a callback set for our per packet threads */
     62        pktcbs = trace_create_callback_set();
     63
     64        /* Set the packet callback to be our packet processing function */
     65        trace_set_packet_cb(pktcbs, process_packet);
     66
    5967        /* We use a new version of trace_start(), trace_pstart()
    60          * The reporter function argument is optional and can be NULL */
    61         if (trace_pstart(trace, NULL, per_packet, NULL)) {
     68         * The reporter function argument is optional and can be NULL.
     69         * We've also set the second argument to NULL because we have no
     70         * global data that we want to be available to all threads. */
     71        if (trace_pstart(trace, NULL, pktcbs, NULL)) {
    6272                trace_perror(trace,"Starting trace");
    6373                trace_destroy(trace);
     
    7585
    7686        trace_destroy(trace);
     87        trace_destroy_callback_set(pktcbs);
    7788
    7889        return 0;
Note: See TracChangeset for help on using the changeset viewer.