Changeset 624c2da


Ignore:
Timestamp:
09/15/15 16:11:31 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
5478d3d
Parents:
ccb89e4
Message:

Update network_capture example to match new API

File:
1 edited

Legend:

Unmodified
Added
Removed
  • examples/parallel/network_capture.c

    rbf3c54e r624c2da  
    11/* Network Capture
    22 *
    3  * Creates a file per stream and writes the result to disk
     3 * Creates a file per stream and writes the result to disk. These files can
     4 * later be merged using tracemerge to create a single ordered trace file.
     5 *
     6 * An alternative approach if we want a single output file is to use the
     7 * ordered combiner and publish the packets through to a reporter thread that
     8 * does the writing to disk.
     9 *
     10 * Defaults to using 4 threads, but this can be changed using the -t option.
    411 */
    512/* Note we include libtrace_parallel.h rather then libtrace.h */
     
    4451}
    4552
    46 
     53/* Creates an output trace and configures it according to our preferences */
    4754static libtrace_out_t *create_output(int my_id) {
    4855        libtrace_out_t *out = NULL;
     
    8895}
    8996
    90 /* Every time a packet becomes ready this function will be called. It will also
    91  * be called when messages from the library are received. This function
    92  * is run in parallel.
    93  */
    94 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    95                         int mesg, libtrace_generic_t data,
    96                         libtrace_thread_t *sender UNUSED)
    97 {
    98         static __thread libtrace_out_t * out;
    99         static __thread int my_id;
    100         libtrace_stat_t *stats;
    101 
    102         switch (mesg) {
    103         case MESSAGE_PACKET:
    104                 trace_write_packet(out, data.pkt);
    105                 /* If we have finished processing this packet return it */
    106                 return data.pkt;
    107         case MESSAGE_STARTING:
    108                 pthread_mutex_lock(&lock);
    109                 my_id = ++count;
    110                 pthread_mutex_unlock(&lock);
    111                 out = create_output(my_id);
    112                 break;
    113         case MESSAGE_STOPPING:
    114                 stats = trace_create_statistics();
    115                 trace_get_thread_statistics(trace, t, stats);
    116 
    117                 pthread_mutex_lock(&lock);
    118                 fprintf(stderr, "Thread #%d statistics\n", my_id);
    119                 trace_print_statistics(stats, stderr, "\t%s: %"PRIu64"\n");
    120                 pthread_mutex_unlock(&lock);
    121 
    122                 free(stats);
    123                 trace_destroy_output(out);
    124                 break;
    125         default:
    126                 return NULL;
    127         }
    128         return NULL;
     97
     98static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
     99                libtrace_thread_t *t UNUSED,
     100                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
     101
     102        /* Retrieve our output trace from the thread local storage */
     103        libtrace_out_t *output = (libtrace_out_t *)tls;
     104
     105        /* Write the packet to disk */
     106        trace_write_packet(output, packet);
     107
     108        /* Return the packet as we are finished with it */
     109        return packet;
     110
     111}
     112
     113/* Creates an output file for this thread */
     114static void *init_process(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     115                void *global UNUSED) {
     116
     117        int my_id = 0;
     118        libtrace_out_t *out;
     119
     120        pthread_mutex_lock(&lock);
     121        my_id = ++count;
     122        pthread_mutex_unlock(&lock);
     123        out = create_output(my_id);
     124
     125        return out;
     126}
     127
     128/* Closes the output file for this thread */
     129static void stop_process(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     130                void *global UNUSED, void *tls) {
     131
     132        /* Retrieve our output trace from the thread local storage */
     133        libtrace_out_t *output = (libtrace_out_t *)tls;
     134
     135        trace_destroy_output(output);
    129136}
    130137
     
    134141        libtrace_stat_t *stats;
    135142        int snaplen = -1;
    136         int nb_threads = -1;
     143        int nb_threads = 4;     
    137144        char *compress_type_str=NULL;
     145        libtrace_callback_set_t *processing = NULL;
    138146
    139147        sigact.sa_handler = stop;
     
    233241        }
    234242
     243        /* Set up our callbacks */
     244        processing = trace_create_callback_set();
     245        trace_set_starting_cb(processing, init_process);
     246        trace_set_stopping_cb(processing, stop_process);
     247        trace_set_packet_cb(processing, per_packet);
     248
    235249        if (snaplen != -1)
    236250                trace_set_snaplen(trace, snaplen);
    237251        if(nb_threads != -1)
    238                 trace_set_tick_count(trace, (size_t) nb_threads);
     252                trace_set_perpkt_threads(trace, nb_threads);
    239253
    240254        /* We use a new version of trace_start(), trace_pstart()
    241255         * The reporter function argument is optional and can be NULL */
    242         if (trace_pstart(trace, NULL, per_packet, NULL)) {
     256        if (trace_pstart(trace, NULL, processing, NULL)) {
    243257                trace_perror(trace,"Starting trace");
    244258                trace_destroy(trace);
     259                trace_destroy_callback_set(processing);
    245260                return 1;
    246261        }
     
    252267                trace_perror(trace,"Reading packets");
    253268                trace_destroy(trace);
     269                trace_destroy_callback_set(processing);
    254270                return 1;
    255271        }
     
    260276        trace_print_statistics(stats, stderr, "\t%s: %"PRIu64"\n");
    261277
     278        trace_destroy_callback_set(processing);
    262279        trace_destroy(trace);
    263280        return 0;
Note: See TracChangeset for help on using the changeset viewer.